Skip to content

Commit

Permalink
feat: implement handling resources id
Browse files Browse the repository at this point in the history
Signed-off-by: Artur Troian <[email protected]>
  • Loading branch information
troian committed Jul 28, 2023
1 parent 44c85af commit 7b31cef
Show file tree
Hide file tree
Showing 52 changed files with 728 additions and 445 deletions.
2 changes: 1 addition & 1 deletion bidengine/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ func (o *order) shouldBid(group *dtypes.Group) (bool, error) {
return false, nil
}

for _, resources := range group.GroupSpec.GetResources() {
for _, resources := range group.GroupSpec.GetResourceUnits() {
if len(resources.Resources.Storage) > o.cfg.MaxGroupVolumes {
o.log.Info(fmt.Sprintf("unable to fulfill: group volumes count exceeds (%d > %d)", len(resources.Resources.Storage), o.cfg.MaxGroupVolumes))
return false, nil
Expand Down
7 changes: 4 additions & 3 deletions bidengine/order_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ var _ BidPricingStrategy = (*alwaysFailsBidPricingStrategy)(nil)
func makeMocks(s *orderTestScaffold) {
groupResult := &dtypes.QueryGroupResponse{}
groupResult.Group.GroupSpec.Name = "testGroupName"
groupResult.Group.GroupSpec.Resources = make([]dtypes.Resource, 1)
groupResult.Group.GroupSpec.Resources = make(dtypes.ResourceUnits, 1)

cpu := atypes.CPU{}
cpu.Units = atypes.NewResourceValue(uint64(dtypes.GetValidationConfig().MinUnitCPU))
Expand All @@ -77,14 +77,15 @@ func makeMocks(s *orderTestScaffold) {
},
}

clusterResources := atypes.ResourceUnits{
clusterResources := atypes.Resources{
ID: 1,
CPU: &cpu,
GPU: &gpu,
Memory: &memory,
Storage: storage,
}
price := sdk.NewInt64DecCoin(testutil.CoinDenom, 23)
resource := dtypes.Resource{
resource := dtypes.ResourceUnit{
Resources: clusterResources,
Count: 2,
Price: price,
Expand Down
12 changes: 6 additions & 6 deletions bidengine/pricing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func defaultGroupSpecCPUMem() *dtypes.GroupSpec {
gspec := &dtypes.GroupSpec{
Name: "",
Requirements: atypes.PlacementRequirements{},
Resources: make([]dtypes.Resource, 1),
Resources: make(dtypes.ResourceUnits, 1),
}

cpu := atypes.CPU{}
Expand All @@ -70,13 +70,13 @@ func defaultGroupSpecCPUMem() *dtypes.GroupSpec {
memory := atypes.Memory{}
memory.Quantity = atypes.NewResourceValue(10000)

clusterResources := atypes.ResourceUnits{
clusterResources := atypes.Resources{
CPU: &cpu,
Memory: &memory,
}

price := sdk.NewDecCoin("uakt", sdk.NewInt(23))
resource := dtypes.Resource{
resource := dtypes.ResourceUnit{
Resources: clusterResources,
Count: 1,
Price: price,
Expand All @@ -91,10 +91,10 @@ func defaultGroupSpec() *dtypes.GroupSpec {
gspec := &dtypes.GroupSpec{
Name: "",
Requirements: atypes.PlacementRequirements{},
Resources: make([]dtypes.Resource, 1),
Resources: make(dtypes.ResourceUnits, 1),
}

clusterResources := atypes.ResourceUnits{
clusterResources := atypes.Resources{
CPU: &atypes.CPU{
Units: atypes.NewResourceValue(11),
},
Expand All @@ -111,7 +111,7 @@ func defaultGroupSpec() *dtypes.GroupSpec {
},
}
price := sdk.NewDecCoin(testutil.CoinDenom, sdk.NewInt(23))
resource := dtypes.Resource{
resource := dtypes.ResourceUnit{
Resources: clusterResources,
Count: 1,
Price: price,
Expand Down
4 changes: 2 additions & 2 deletions cluster/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ type inventory struct {
var _ ctypes.Inventory = (*inventory)(nil)

func (inv *inventory) Adjust(reservation ctypes.ReservationGroup, opts ...ctypes.InventoryOption) error {
resources := make([]types.Resources, len(reservation.Resources().GetResources()))
copy(resources, reservation.Resources().GetResources())
resources := make(dtypes.ResourceUnits, len(reservation.Resources().GetResourceUnits()))
copy(resources, reservation.Resources().GetResourceUnits())

currInventory := inv.dup()

Expand Down
33 changes: 17 additions & 16 deletions cluster/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (is *inventoryService) ready() <-chan struct{} {
return is.readych
}

func (is *inventoryService) lookup(order mtypes.OrderID, resources atypes.ResourceGroup) (ctypes.Reservation, error) {
func (is *inventoryService) lookup(order mtypes.OrderID, resources dtypes.ResourceGroup) (ctypes.Reservation, error) {
ch := make(chan inventoryResponse, 1)
req := inventoryRequest{
order: order,
Expand All @@ -155,15 +155,15 @@ func (is *inventoryService) lookup(order mtypes.OrderID, resources atypes.Resour
}
}

func (is *inventoryService) reserve(order mtypes.OrderID, resources atypes.ResourceGroup) (ctypes.Reservation, error) {
for idx, res := range resources.GetResources() {
if res.Resources.CPU == nil {
func (is *inventoryService) reserve(order mtypes.OrderID, resources dtypes.ResourceGroup) (ctypes.Reservation, error) {
for idx, res := range resources.GetResourceUnits() {
if res.CPU == nil {
return nil, fmt.Errorf("%w: CPU resource at idx %d is nil", ErrInvalidResource, idx)
}
if res.Resources.GPU == nil {
if res.GPU == nil {
return nil, fmt.Errorf("%w: GPU resource at idx %d is nil", ErrInvalidResource, idx)
}
if res.Resources.Memory == nil {
if res.Memory == nil {
return nil, fmt.Errorf("%w: Memory resource at idx %d is nil", ErrInvalidResource, idx)
}
}
Expand Down Expand Up @@ -231,7 +231,7 @@ func (is *inventoryService) status(ctx context.Context) (ctypes.InventoryStatus,

type inventoryRequest struct {
order mtypes.OrderID
resources atypes.ResourceGroup
resources dtypes.ResourceGroup
ch chan<- inventoryResponse
}

Expand All @@ -240,11 +240,12 @@ type inventoryResponse struct {
err error
}

func (is *inventoryService) resourcesToCommit(rgroup atypes.ResourceGroup) atypes.ResourceGroup {
replacedResources := make([]dtypes.Resource, 0)
func (is *inventoryService) resourcesToCommit(rgroup dtypes.ResourceGroup) dtypes.ResourceGroup {
replacedResources := make(dtypes.ResourceUnits, 0)

for _, resource := range rgroup.GetResources() {
runits := atypes.ResourceUnits{
for _, resource := range rgroup.GetResourceUnits() {
runits := atypes.Resources{
ID: resource.ID,
CPU: &atypes.CPU{
Units: sdlutil.ComputeCommittedResources(is.config.CPUCommitLevel, resource.Resources.GetCPU().GetUnits()),
Attributes: resource.Resources.GetCPU().GetAttributes(),
Expand Down Expand Up @@ -272,7 +273,7 @@ func (is *inventoryService) resourcesToCommit(rgroup atypes.ResourceGroup) atype

runits.Storage = storage

v := dtypes.Resource{
v := dtypes.ResourceUnit{
Resources: runits,
Count: resource.Count,
Price: sdk.DecCoin{},
Expand Down Expand Up @@ -342,7 +343,7 @@ func updateReservationMetrics(reservations []*reservation) {
memoryTotal = &activeMemoryTotal
endpointsTotal = &activeEndpointsTotal
}
for _, resource := range reservation.Resources().GetResources() {
for _, resource := range reservation.Resources().GetResourceUnits() {
*cpuTotal += float64(resource.Resources.GetCPU().GetUnits().Value() * uint64(resource.Count))
*gpuTotal += float64(resource.Resources.GetGPU().GetUnits().Value() * uint64(resource.Count))
*memoryTotal += float64(resource.Resources.GetMemory().Quantity.Value() * uint64(resource.Count))
Expand Down Expand Up @@ -389,7 +390,7 @@ func (is *inventoryService) handleRequest(req inventoryRequest, state *inventory
reservation := newReservation(req.order, resourcesToCommit)

{
jReservation, _ := json.Marshal(req.resources.GetResources())
jReservation, _ := json.Marshal(req.resources.GetResourceUnits())
is.log.Debug("reservation requested", "order", req.order, fmt.Sprintf("resources=%s", jReservation))
}

Expand Down Expand Up @@ -727,7 +728,7 @@ func (is *inventoryService) getStatus(state *inventoryServiceState) ctypes.Inven
Storage: make(map[string]int64),
}

for _, resources := range reservation.Resources().GetResources() {
for _, resources := range reservation.Resources().GetResourceUnits() {
total.AddResources(resources)
}

Expand All @@ -751,7 +752,7 @@ func (is *inventoryService) getStatus(state *inventoryServiceState) ctypes.Inven
func reservationCountEndpoints(reservation *reservation) uint {
var externalPortCount uint

resources := reservation.Resources().GetResources()
resources := reservation.Resources().GetResourceUnits()
// Count the number of endpoints per resource. The number of instances does not affect
// the number of ports
for _, resource := range resources {
Expand Down
20 changes: 12 additions & 8 deletions cluster/inventory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ func newInventory(nodes ...string) ctypes.Inventory {
}

func TestInventory_reservationAllocatable(t *testing.T) {
mkrg := func(cpu uint64, gpu uint64, memory uint64, storage uint64, endpointsCount uint, count uint32) dtypes.Resource {
mkrg := func(cpu uint64, gpu uint64, memory uint64, storage uint64, endpointsCount uint, count uint32) dtypes.ResourceUnit {
endpoints := make([]types.Endpoint, endpointsCount)
return dtypes.Resource{
Resources: types.ResourceUnits{
return dtypes.ResourceUnit{
Resources: types.Resources{
ID: 1,
CPU: &types.CPU{
Units: types.NewResourceValue(cpu),
},
Expand All @@ -91,7 +92,7 @@ func TestInventory_reservationAllocatable(t *testing.T) {
}
}

mkres := func(allocated bool, res ...dtypes.Resource) *reservation {
mkres := func(allocated bool, res ...dtypes.ResourceUnit) *reservation {
return &reservation{
allocated: allocated,
resources: &dtypes.GroupSpec{Resources: res},
Expand Down Expand Up @@ -189,7 +190,8 @@ func TestInventory_ClusterDeploymentDeployed(t *testing.T) {

groupServices[0] = manifest.Service{
Count: 1,
Resources: types.ResourceUnits{
Resources: types.Resources{
ID: 1,
CPU: &types.CPU{
Units: types.NewResourceValue(1),
},
Expand Down Expand Up @@ -327,7 +329,8 @@ func makeInventoryScaffold(t *testing.T, leaseQty uint, inventoryCall bool, node
}
}

deploymentRequirements := types.ResourceUnits{
deploymentRequirements := types.Resources{
ID: 1,
CPU: &types.CPU{
Units: types.NewResourceValue(4000),
},
Expand Down Expand Up @@ -397,7 +400,8 @@ func makeGroupForInventoryTest(sharedHTTP, nodePort, leasedIP bool) manifest.Gro
serviceEndpoints = append(serviceEndpoints, serviceEndpoint)
}

deploymentRequirements := types.ResourceUnits{
deploymentRequirements := types.Resources{
ID: 1,
CPU: &types.CPU{
Units: types.NewResourceValue(4000),
},
Expand Down Expand Up @@ -619,7 +623,7 @@ func TestInventory_OverReservations(t *testing.T) {
}
}

deploymentRequirements := types.ResourceUnits{
deploymentRequirements := types.Resources{
CPU: &types.CPU{
Units: types.NewResourceValue(4000),
},
Expand Down
5 changes: 2 additions & 3 deletions cluster/kube/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
metricsutils "github.com/akash-network/node/util/metrics"

"github.com/akash-network/provider/cluster/kube/builder"
crd "github.com/akash-network/provider/pkg/apis/akash.network/v2beta2"
crdapi "github.com/akash-network/provider/pkg/client/clientset/versioned"
)

Expand Down Expand Up @@ -154,7 +153,7 @@ func applyService(ctx context.Context, kc kubernetes.Interface, b builder.Servic
return err
}

func applyManifest(ctx context.Context, kc crdapi.Interface, b builder.Manifest) (*crd.Manifest, error) {
func applyManifest(ctx context.Context, kc crdapi.Interface, b builder.Manifest) error {
obj, err := kc.AkashV2beta2().Manifests(b.NS()).Get(ctx, b.Name(), metav1.GetOptions{})

metricsutils.IncCounterVecWithLabelValuesFiltered(kubeCallsCounter, "akash-manifests-get", err, errors.IsNotFound)
Expand All @@ -175,5 +174,5 @@ func applyManifest(ctx context.Context, kc crdapi.Interface, b builder.Manifest)
}
}

return obj, err
return err
}
48 changes: 47 additions & 1 deletion cluster/kube/builder/builder.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package builder

import (
"errors"
"fmt"
"reflect"
"strconv"
Expand Down Expand Up @@ -49,6 +50,10 @@ const (
envVarAkashClusterPublicHostname = "AKASH_CLUSTER_PUBLIC_HOSTNAME"
)

var (
ErrKubeBuilder = errors.New("kube-builder")
)

var (
dnsPort = intstr.FromInt(53)
udpProtocol = corev1.Protocol("UDP")
Expand All @@ -73,7 +78,8 @@ func ClusterDeploymentFromDeployment(d ctypes.IDeployment) (IClusterDeployment,
cparams, valid := d.ClusterParams().(crd.ClusterSettings)
if !valid {
// nolint: goerr113
return nil, fmt.Errorf("kube-cluster: unexpected type from ClusterParams(). expected (%s), actual (%s)",
return nil, fmt.Errorf("%w: unexpected type from ClusterParams(). expected (%s), actual (%s)",
ErrKubeBuilder,
reflect.TypeOf(crd.ClusterSettings{}),
reflect.TypeOf(d.ClusterParams()),
)
Expand All @@ -85,6 +91,10 @@ func ClusterDeploymentFromDeployment(d ctypes.IDeployment) (IClusterDeployment,
Sparams: cparams,
}

if err := cd.validate(); err != nil {
return cd, err
}

return cd, nil
}

Expand All @@ -100,6 +110,42 @@ func (d *ClusterDeployment) ClusterParams() crd.ClusterSettings {
return d.Sparams
}

func (d *ClusterDeployment) validate() error {
if len(d.Group.Services) != len(d.Sparams.SchedulerParams) {
return fmt.Errorf("%w: group services count does not match scheduler params count (%d) != (%d)",
ErrKubeBuilder,
len(d.Group.Services),
len(d.Sparams.SchedulerParams))
}

for idx := range d.Group.Services {
svc := d.Group.Services[idx]
sParams := d.Sparams.SchedulerParams[idx]

if svc.Resources.CPU == nil {
return fmt.Errorf("%w: service %s. resource CPU cannot be nil", ErrKubeBuilder, svc.Name)
}

if svc.Resources.GPU == nil {
return fmt.Errorf("%w: service %s. resource GPU cannot be nil", ErrKubeBuilder, svc.Name)
}

if svc.Resources.Memory == nil {
return fmt.Errorf("%w: service %s. resource Memory cannot be nil", ErrKubeBuilder, svc.Name)
}

if svc.Resources.GPU.Units.Value() > 0 {
if sParams == nil ||
sParams.Resources == nil ||
sParams.Resources.GPU == nil {
return fmt.Errorf("%w: service %s. SchedulerParams.Resources.GPU must not be nil when GPU > 0", ErrKubeBuilder, svc.Name)
}
}
}

return nil
}

type builderBase interface {
NS() string
Name() string
Expand Down
3 changes: 1 addition & 2 deletions cluster/kube/builder/netpol.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"

manitypes "github.com/akash-network/akash-api/go/manifest/v2beta2"
sdlutil "github.com/akash-network/node/sdl/util"
)

type NetPol interface {
Expand Down Expand Up @@ -165,7 +164,7 @@ func (b *netPol) Create() ([]*netv1.NetworkPolicy, error) { // nolint:golint,unp
portsWithIP = append(portsWithIP, entry)
}

if !expose.Global || sdlutil.ShouldBeIngress(expose) {
if !expose.Global || expose.IsIngress() {
continue
}

Expand Down
Loading

0 comments on commit 7b31cef

Please sign in to comment.