Skip to content

Commit

Permalink
[YUNIKORN-2972] Remove internal object from user and group REST info (#…
Browse files Browse the repository at this point in the history
…993)

Remove the Resource object from the REST response for the user and group
trackers. Removed the usage of the DAO objects in the internal unit
tests. The unit tests use new functions which directly expose the same
structures without using the DAO.

Cleanup:
- remove export from internal functions (UserTracker, GroupTracker)
- renamed functions to match their results (Manager)
- update handler to call renamed functions in Manager
- reimplemented test functions to not use DAO objects (Manager,
  UserTracker, GroupTracker, QueueTracker)
- assert function changes to use new functions removing DAO usage
  (scheduler/utilites_test, scheduler/objects/utilites_test)

Closes: #993

Signed-off-by: Craig Condit <[email protected]>
  • Loading branch information
wilfred-s authored and craigcondit committed Nov 13, 2024
1 parent 8d139b1 commit d7d1408
Show file tree
Hide file tree
Showing 14 changed files with 286 additions and 219 deletions.
24 changes: 14 additions & 10 deletions pkg/scheduler/objects/utilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,19 +284,23 @@ func getUserGroup(userName string, groupNameList []string) security.UserGroup {
}

func assertUserGroupResource(t *testing.T, userGroup security.UserGroup, expected *resources.Resource) {
ugm := ugm.GetUserManager()
userResource := ugm.GetUserResources(userGroup)
groupResource := ugm.GetGroupResources(userGroup.Groups[0])
assert.Equal(t, resources.Equals(userResource, expected), true)
assert.Equal(t, resources.Equals(groupResource, nil), true)
assertUserResourcesAndGroupResources(t, userGroup, expected, nil, 0)
}

func assertUserResourcesAndGroupResources(t *testing.T, userGroup security.UserGroup, expectedUserResources *resources.Resource, expectedGroupResources *resources.Resource, i int) {
ugm := ugm.GetUserManager()
userResource := ugm.GetUserResources(userGroup)
groupResource := ugm.GetGroupResources(userGroup.Groups[i])
assert.Equal(t, resources.Equals(userResource, expectedUserResources), true)
assert.Equal(t, resources.Equals(groupResource, expectedGroupResources), true)
m := ugm.GetUserManager()
userResource := m.GetUserResources(userGroup.User)
if expectedUserResources == nil {
assert.Assert(t, userResource.IsEmpty(), "expected empty resource in user tracker")
} else {
assert.Assert(t, resources.Equals(userResource, expectedUserResources), "user value '%s' not equal to expected '%s'", userResource.String(), expectedUserResources.String())
}
groupResource := m.GetGroupResources(userGroup.Groups[i])
if expectedGroupResources == nil {
assert.Assert(t, groupResource.IsEmpty(), "expected empty resource in group tracker")
} else {
assert.Assert(t, resources.Equals(groupResource, expectedGroupResources), "group value '%s' not equal to expected '%s'", groupResource.String(), expectedGroupResources.String())
}
}

func assertAllocationLog(t *testing.T, ask *Allocation) {
Expand Down
65 changes: 48 additions & 17 deletions pkg/scheduler/ugm/group_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package ugm
import (
"strings"

"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/locking"
Expand Down Expand Up @@ -92,43 +91,46 @@ func (gt *GroupTracker) clearLimits(queuePath string) {
gt.queueTracker.setLimit(strings.Split(queuePath, configs.DOT), nil, 0, false, group, false)
}

// Note: headroom of queue tracker is not read-only, it also traverses the queue hierarchy and creates childQueueTracker if it does not exist.
// headroom calculate the resource headroom for the group in the hierarchy defined
// Note: headroom of queue tracker is not read-only.
// It traverses the queue hierarchy and creates a childQueueTracker if it does not exist.
func (gt *GroupTracker) headroom(hierarchy []string) *resources.Resource {
gt.Lock()
defer gt.Unlock()
return gt.queueTracker.headroom(hierarchy, group)
}

func (gt *GroupTracker) GetGroupResourceUsageDAOInfo() *dao.GroupResourceUsageDAOInfo {
// GetResourceUsageDAOInfo returns the DAO object used in the REST API for this group tracker
func (gt *GroupTracker) GetResourceUsageDAOInfo() *dao.GroupResourceUsageDAOInfo {
gt.RLock()
defer gt.RUnlock()
groupResourceUsage := &dao.GroupResourceUsageDAOInfo{
Applications: []string{},
}
groupResourceUsage.GroupName = gt.groupName
var apps []string
for app := range gt.applications {
groupResourceUsage.Applications = append(groupResourceUsage.Applications, app)
apps = append(apps, app)
}
return &dao.GroupResourceUsageDAOInfo{
Applications: apps,
GroupName: gt.groupName,
Queues: gt.queueTracker.getResourceUsageDAOInfo(),
}
groupResourceUsage.Queues = gt.queueTracker.getResourceUsageDAOInfo(common.Empty)
return groupResourceUsage
}

func (gt *GroupTracker) IsQueuePathTrackedCompletely(hierarchy []string) bool {
func (gt *GroupTracker) isQueuePathTrackedCompletely(hierarchy []string) bool {
gt.RLock()
defer gt.RUnlock()
return gt.queueTracker.IsQueuePathTrackedCompletely(hierarchy)
return gt.queueTracker.isQueuePathTrackedCompletely(hierarchy)
}

func (gt *GroupTracker) IsUnlinkRequired(hierarchy []string) bool {
func (gt *GroupTracker) isUnlinkRequired(hierarchy []string) bool {
gt.RLock()
defer gt.RUnlock()
return gt.queueTracker.IsUnlinkRequired(hierarchy)
return gt.queueTracker.isUnlinkRequired(hierarchy)
}

func (gt *GroupTracker) UnlinkQT(hierarchy []string) bool {
func (gt *GroupTracker) unlinkQT(hierarchy []string) bool {
gt.Lock()
defer gt.Unlock()
return gt.queueTracker.UnlinkQT(hierarchy)
return gt.queueTracker.unlink(hierarchy)
}

func (gt *GroupTracker) canBeRemoved() bool {
Expand All @@ -153,9 +155,38 @@ func (gt *GroupTracker) decreaseAllTrackedResourceUsage(hierarchy []string) map[
return removedApplications
}

// Note: canRunApp of queue tracker is not read-only, it also traverses the queue hierarchy and creates a childQueueTracker if it does not exist.
// canRunApp checks if the group is allowed to run the application in the queue defined in hierarchy.
// Note: canRunApp of queue tracker is not read-only,
// It traverses the queue hierarchy and creates a childQueueTracker if it does not exist.
func (gt *GroupTracker) canRunApp(hierarchy []string, applicationID string) bool {
gt.Lock()
defer gt.Unlock()
return gt.queueTracker.canRunApp(hierarchy, applicationID, group)
}

// GetMaxResources returns a map of the maxResources for all queues registered under this group tracker.
// The key into the map is the queue path.
// This should only be used in test
func (gt *GroupTracker) GetMaxResources() map[string]*resources.Resource {
gt.RLock()
defer gt.RUnlock()
return gt.queueTracker.getMaxResources()
}

// GetMaxApplications returns a map of the maxRunningApps for all queues registered under this group tracker.
// The key into the map is the queue path.
// This should only be used in test
func (gt *GroupTracker) GetMaxApplications() map[string]uint64 {
gt.RLock()
defer gt.RUnlock()
return gt.queueTracker.getMaxApplications()
}

// getUsedResources returns a map of the usedResources for all queues registered under this group tracker.
// The key into the map is the queue path.
// This should only be used in test
func (gt *GroupTracker) getUsedResources() map[string]*resources.Resource {
gt.RLock()
defer gt.RUnlock()
return gt.queueTracker.getUsedResources()
}
13 changes: 3 additions & 10 deletions pkg/scheduler/ugm/group_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func TestGTIncreaseTrackedResource(t *testing.T) {
t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage3)
}
groupTracker.increaseTrackedResource(path4, TestApp4, usage4, user.User)
actualResources := getGroupResource(groupTracker)

actualResources := groupTracker.queueTracker.getUsedResources()
assert.Equal(t, "map[mem:80000000 vcore:80000]", actualResources["root"].String(), "wrong resource")
assert.Equal(t, "map[mem:80000000 vcore:80000]", actualResources["root.parent"].String(), "wrong resource")
assert.Equal(t, "map[mem:40000000 vcore:40000]", actualResources["root.parent.child1"].String(), "wrong resource")
Expand Down Expand Up @@ -104,9 +104,9 @@ func TestGTDecreaseTrackedResource(t *testing.T) {
t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage2)
}
groupTracker.increaseTrackedResource(path2, TestApp2, usage2, user.User)
actualResources := getGroupResource(groupTracker)

assert.Equal(t, 2, len(groupTracker.getTrackedApplications()))
actualResources := groupTracker.getUsedResources()
assert.Equal(t, "map[mem:90000000 vcore:90000]", actualResources["root"].String(), "wrong resource")
assert.Equal(t, "map[mem:90000000 vcore:90000]", actualResources["root.parent"].String(), "wrong resource")
assert.Equal(t, "map[mem:70000000 vcore:70000]", actualResources["root.parent.child1"].String(), "wrong resource")
Expand All @@ -126,8 +126,7 @@ func TestGTDecreaseTrackedResource(t *testing.T) {
removeQT = groupTracker.decreaseTrackedResource(path2, TestApp2, usage3, false)
assert.Equal(t, removeQT, false, "wrong remove queue tracker value")

actualResources1 := getGroupResource(groupTracker)

actualResources1 := groupTracker.getUsedResources()
assert.Equal(t, "map[mem:70000000 vcore:70000]", actualResources1["root"].String(), "wrong resource")
assert.Equal(t, "map[mem:70000000 vcore:70000]", actualResources1["root.parent"].String(), "wrong resource")
assert.Equal(t, "map[mem:60000000 vcore:60000]", actualResources1["root.parent.child1"].String(), "wrong resource")
Expand Down Expand Up @@ -282,9 +281,3 @@ func TestGTCanRunApp(t *testing.T) {
assert.Assert(t, groupTracker.canRunApp(hierarchy1, TestApp1))
assert.Assert(t, !groupTracker.canRunApp(hierarchy1, TestApp2))
}

func getGroupResource(gt *GroupTracker) map[string]*resources.Resource {
resources := make(map[string]*resources.Resource)
usage := gt.GetGroupResourceUsageDAOInfo()
return internalGetResource(usage.Queues, resources)
}
38 changes: 20 additions & 18 deletions pkg/scheduler/ugm/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (m *Manager) DecreaseTrackedResource(queuePath, applicationID string, usage
}
}

func (m *Manager) GetUsersResources() []*UserTracker {
func (m *Manager) GetUserTrackers() []*UserTracker {
m.RLock()
defer m.RUnlock()
var userTrackers []*UserTracker
Expand All @@ -205,7 +205,7 @@ func (m *Manager) GetUserTracker(user string) *UserTracker {
return m.userTrackers[user]
}

func (m *Manager) GetGroupsResources() []*GroupTracker {
func (m *Manager) GetGroupTrackers() []*GroupTracker {
m.RLock()
defer m.RUnlock()
var groupTrackers []*GroupTracker
Expand Down Expand Up @@ -496,14 +496,14 @@ func (m *Manager) clearEarlierSetUserLimits(newUserLimits map[string]map[string]
func (m *Manager) resetUserEarlierUsage(ut *UserTracker, queuePath string) {
// Is this user already tracked for the queue path?
hierarchy := strings.Split(queuePath, configs.DOT)
if ut.IsQueuePathTrackedCompletely(hierarchy) {
if ut.isQueuePathTrackedCompletely(hierarchy) {
log.Log(log.SchedUGM).Debug("Need to clear earlier set configs for user",
zap.String("user", ut.userName),
zap.Strings("queue path", hierarchy))
ut.clearLimits(queuePath, false)
// Is there any running applications in end queue of this queue path? If not, then remove the linkage between end queue and its immediate parent
if ut.IsUnlinkRequired(hierarchy) {
ut.UnlinkQT(hierarchy)
if ut.isUnlinkRequired(hierarchy) {
ut.unlinkQT(hierarchy)
}
log.Log(log.SchedUGM).Debug("Cleared earlier set limit configs for user",
zap.String("user", ut.userName),
Expand Down Expand Up @@ -544,7 +544,7 @@ func (m *Manager) clearEarlierSetGroupLimits(newGroupLimits map[string]map[strin
// eventually remove group tracker object itself from ugm if it can be removed.
func (m *Manager) resetGroupEarlierUsage(gt *GroupTracker, queuePath string) {
hierarchy := strings.Split(queuePath, configs.DOT)
if gt.IsQueuePathTrackedCompletely(hierarchy) {
if gt.isQueuePathTrackedCompletely(hierarchy) {
log.Log(log.SchedUGM).Debug("Need to clear earlier set configs for group",
zap.String("group", gt.groupName),
zap.Strings("queue path", hierarchy))
Expand All @@ -555,8 +555,8 @@ func (m *Manager) resetGroupEarlierUsage(gt *GroupTracker, queuePath string) {
}
gt.clearLimits(queuePath)
// Is there any running applications in end queue of this queue path? If not, then remove the linkage between end queue and its immediate parent
if gt.IsUnlinkRequired(hierarchy) {
gt.UnlinkQT(hierarchy)
if gt.isUnlinkRequired(hierarchy) {
gt.unlinkQT(hierarchy)
}
log.Log(log.SchedUGM).Debug("Cleared earlier set limit configs for group",
zap.String("group", gt.groupName),
Expand Down Expand Up @@ -710,24 +710,26 @@ func (m *Manager) ClearConfigLimits() {
m.groupLimits = make(map[string]map[string]*LimitConfig)
}

// GetUserResources only for tests
func (m *Manager) GetUserResources(user security.UserGroup) *resources.Resource {
// GetUserResources returns the root queue maxResources for the user
// Should only be used in tests
func (m *Manager) GetUserResources(user string) *resources.Resource {
m.RLock()
defer m.RUnlock()
ut := m.userTrackers[user.User]
if ut != nil && ut.GetUserResourceUsageDAOInfo().Queues.ResourceUsage != nil && len(ut.GetUserResourceUsageDAOInfo().Queues.ResourceUsage.Resources) > 0 {
return ut.GetUserResourceUsageDAOInfo().Queues.ResourceUsage
ut := m.userTrackers[user]
if ut == nil {
return nil
}
return nil
return ut.queueTracker.resourceUsage.Clone()
}

// GetGroupResources only for tests
// GetGroupResources returns the root queue maxResources
// Should only be used in tests
func (m *Manager) GetGroupResources(group string) *resources.Resource {
m.RLock()
defer m.RUnlock()
gt := m.groupTrackers[group]
if gt != nil && gt.GetGroupResourceUsageDAOInfo().Queues.ResourceUsage != nil && len(gt.GetGroupResourceUsageDAOInfo().Queues.ResourceUsage.Resources) > 0 {
return gt.GetGroupResourceUsageDAOInfo().Queues.ResourceUsage
if gt == nil {
return nil
}
return nil
return gt.queueTracker.resourceUsage.Clone()
}
47 changes: 24 additions & 23 deletions pkg/scheduler/ugm/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func TestAddRemoveUserAndGroups(t *testing.T) {
manager.IncreaseTrackedResource("", "", usage1, user)
manager.IncreaseTrackedResource(queuePath1, TestApp1, usage1, user)

groupTrackers := manager.GetGroupsResources()
groupTrackers := manager.GetGroupTrackers()
assert.Equal(t, len(groupTrackers), 0)
assertUGM(t, user, usage1, 1)
assert.Equal(t, user.User, manager.GetUserTracker(user.User).userName)
Expand All @@ -213,19 +213,19 @@ func TestAddRemoveUserAndGroups(t *testing.T) {
assert.Equal(t, user.User, manager.GetUserTracker(user.User).userName)
assert.Equal(t, user1.User, manager.GetUserTracker(user1.User).userName)

assert.Equal(t, true, manager.GetUserTracker(user.User).queueTracker.IsQueuePathTrackedCompletely(strings.Split(queuePath1, configs.DOT)))
assert.Equal(t, true, manager.GetUserTracker(user1.User).queueTracker.IsQueuePathTrackedCompletely(strings.Split(queuePath2, configs.DOT)))
assert.Equal(t, false, manager.GetUserTracker(user1.User).queueTracker.IsQueuePathTrackedCompletely(strings.Split(queuePath1, configs.DOT)))
assert.Equal(t, false, manager.GetUserTracker(user.User).queueTracker.IsQueuePathTrackedCompletely(strings.Split(queuePath2, configs.DOT)))
assert.Equal(t, false, manager.GetUserTracker(user.User).queueTracker.IsQueuePathTrackedCompletely(strings.Split(queuePath3, configs.DOT)))
assert.Equal(t, false, manager.GetUserTracker(user.User).queueTracker.IsQueuePathTrackedCompletely(strings.Split(queuePath4, configs.DOT)))
assert.Equal(t, true, manager.GetUserTracker(user.User).queueTracker.isQueuePathTrackedCompletely(strings.Split(queuePath1, configs.DOT)))
assert.Equal(t, true, manager.GetUserTracker(user1.User).queueTracker.isQueuePathTrackedCompletely(strings.Split(queuePath2, configs.DOT)))
assert.Equal(t, false, manager.GetUserTracker(user1.User).queueTracker.isQueuePathTrackedCompletely(strings.Split(queuePath1, configs.DOT)))
assert.Equal(t, false, manager.GetUserTracker(user.User).queueTracker.isQueuePathTrackedCompletely(strings.Split(queuePath2, configs.DOT)))
assert.Equal(t, false, manager.GetUserTracker(user.User).queueTracker.isQueuePathTrackedCompletely(strings.Split(queuePath3, configs.DOT)))
assert.Equal(t, false, manager.GetUserTracker(user.User).queueTracker.isQueuePathTrackedCompletely(strings.Split(queuePath4, configs.DOT)))

assert.Equal(t, true, manager.GetUserTracker(user.Groups[0]).queueTracker.IsQueuePathTrackedCompletely(strings.Split(queuePath1, configs.DOT)))
assert.Equal(t, true, manager.GetUserTracker(user1.Groups[0]).queueTracker.IsQueuePathTrackedCompletely(strings.Split(queuePath2, configs.DOT)))
assert.Equal(t, false, manager.GetUserTracker(user1.Groups[0]).queueTracker.IsQueuePathTrackedCompletely(strings.Split(queuePath1, configs.DOT)))
assert.Equal(t, false, manager.GetUserTracker(user.Groups[0]).queueTracker.IsQueuePathTrackedCompletely(strings.Split(queuePath2, configs.DOT)))
assert.Equal(t, false, manager.GetUserTracker(user.Groups[0]).queueTracker.IsQueuePathTrackedCompletely(strings.Split(queuePath3, configs.DOT)))
assert.Equal(t, false, manager.GetUserTracker(user.Groups[0]).queueTracker.IsQueuePathTrackedCompletely(strings.Split(queuePath4, configs.DOT)))
assert.Equal(t, true, manager.GetUserTracker(user.Groups[0]).queueTracker.isQueuePathTrackedCompletely(strings.Split(queuePath1, configs.DOT)))
assert.Equal(t, true, manager.GetUserTracker(user1.Groups[0]).queueTracker.isQueuePathTrackedCompletely(strings.Split(queuePath2, configs.DOT)))
assert.Equal(t, false, manager.GetUserTracker(user1.Groups[0]).queueTracker.isQueuePathTrackedCompletely(strings.Split(queuePath1, configs.DOT)))
assert.Equal(t, false, manager.GetUserTracker(user.Groups[0]).queueTracker.isQueuePathTrackedCompletely(strings.Split(queuePath2, configs.DOT)))
assert.Equal(t, false, manager.GetUserTracker(user.Groups[0]).queueTracker.isQueuePathTrackedCompletely(strings.Split(queuePath3, configs.DOT)))
assert.Equal(t, false, manager.GetUserTracker(user.Groups[0]).queueTracker.isQueuePathTrackedCompletely(strings.Split(queuePath4, configs.DOT)))

usage3, err := resources.NewResourceFromConf(map[string]string{"mem": "5M", "vcore": "5"})
if err != nil {
Expand All @@ -237,12 +237,12 @@ func TestAddRemoveUserAndGroups(t *testing.T) {
assertUGM(t, user, usage1, 2)

manager.DecreaseTrackedResource(queuePath1, TestApp1, usage3, user, true)
assert.Equal(t, 1, len(manager.GetUsersResources()), "userTrackers count should be 1")
assert.Equal(t, 0, len(manager.GetGroupsResources()), "groupTrackers count should be 0")
assert.Equal(t, 1, len(manager.GetUserTrackers()), "userTrackers count should be 1")
assert.Equal(t, 0, len(manager.GetGroupTrackers()), "groupTrackers count should be 0")

manager.DecreaseTrackedResource(queuePath2, TestApp2, usage2, user1, true)
assert.Equal(t, 0, len(manager.GetUsersResources()), "userTrackers count should be 0")
assert.Equal(t, 0, len(manager.GetGroupsResources()), "groupTrackers count should be 0")
assert.Equal(t, 0, len(manager.GetUserTrackers()), "userTrackers count should be 0")
assert.Equal(t, 0, len(manager.GetGroupTrackers()), "groupTrackers count should be 0")

assert.Assert(t, manager.GetUserTracker(user.User) == nil)
assert.Assert(t, manager.GetGroupTracker(user.Groups[0]) == nil)
Expand Down Expand Up @@ -1979,12 +1979,13 @@ func setupUGM() {

func assertUGM(t *testing.T, userGroup security.UserGroup, expected *resources.Resource, usersCount int) {
manager := GetUserManager()
assert.Equal(t, usersCount, len(manager.GetUsersResources()), "userTrackers count should be "+strconv.Itoa(usersCount))
assert.Equal(t, 0, len(manager.GetGroupsResources()), "groupTrackers count should be "+strconv.Itoa(0))
userRes := manager.GetUserResources(userGroup)
assert.Equal(t, resources.Equals(userRes, expected), true)
groupRes := manager.GetGroupResources(userGroup.Groups[0])
assert.Equal(t, resources.Equals(groupRes, nil), true)
assert.Equal(t, usersCount, len(manager.GetUserTrackers()), "userTrackers count not as expected")
assert.Equal(t, 0, len(manager.GetGroupTrackers()), "groupTrackers count should be 0")
userTR := manager.GetUserTracker(userGroup.User)
assert.Assert(t, userTR != nil, "user tracker should be defined")
assert.Assert(t, resources.Equals(userTR.queueTracker.resourceUsage, expected), "user max resource for root not correct")
groupTR := manager.GetGroupTracker(userGroup.Groups[0])
assert.Assert(t, groupTR == nil, "group tracker should not be defined")
}

func assertMaxLimits(t *testing.T, userGroup security.UserGroup, expectedResource *resources.Resource, expectedMaxApps int) {
Expand Down
Loading

0 comments on commit d7d1408

Please sign in to comment.