Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue open-horizon#3814 - MultiNamespace: Agent should utilize IS_NAM… #3824

Merged
merged 1 commit into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agreementbot/agreementworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ func (b *BaseAgreementWorker) InitiateNewAgreement(cph ConsumerProtocolHandler,
// for cluster type and check for namespace compatibility
consumerNamespace := ""
if nodeType == persistence.DEVICE_TYPE_CLUSTER {
t_comp, consumerNamespace, t_reason = compcheck.CheckClusterNamespaceCompatibility(nodeType, exchangeDev.ClusterNamespace, wi.ConsumerPolicy.ClusterNamespace, topSvcDef.GetClusterDeployment(), false, msgPrinter)
t_comp, consumerNamespace, t_reason = compcheck.CheckClusterNamespaceCompatibility(nodeType, exchangeDev.ClusterNamespace, exchangeDev.IsNamespaceScoped, wi.ConsumerPolicy.ClusterNamespace, topSvcDef.GetClusterDeployment(), wi.ConsumerPolicy.PatternId, false, msgPrinter)
if !t_comp {
glog.Warningf(BAWlogstring(workerId, fmt.Sprintf("cannot make agreement with node %v for service %v/%v %v. %v", wi.Device.Id, workload.Org, workload.WorkloadURL, workload.Version, t_reason)))
return
Expand Down
63 changes: 44 additions & 19 deletions agreementbot/consumer_protocol_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,13 +319,24 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChanged(cmd *PolicyChangedComm
}
continue
} else if err := b.pm.MatchesMine(cmd.Msg.Org(), pol); err != nil {
if glog.V(5) {
glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("cmd msg org matches mine for agreement %v", ag.CurrentAgreementId)))
}
agStillValid := false
policyMatches := true
noNewPriority := false
clusterNSNotChange := true

if ag.Pattern == "" {
policyMatches, noNewPriority = b.HandlePolicyChangeForAgreement(ag, cmd.Msg.OldPolicy(), cph)
policyMatches, noNewPriority, clusterNSNotChange = b.HandlePolicyChangeForAgreement(ag, cmd.Msg.OldPolicy(), cph)
agStillValid = policyMatches && noNewPriority
if ag.GetDeviceType() == persistence.DEVICE_TYPE_CLUSTER {
agStillValid = agStillValid && clusterNSNotChange
}
}

if glog.V(5) {
glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("for current agreement %v: agStillValid: %v, policyMatches: %v, noNewPriority: %v, clusterNSNotChange: %v", ag.CurrentAgreementId, agStillValid, policyMatches, noNewPriority, clusterNSNotChange)))
}

if !agStillValid {
Expand Down Expand Up @@ -373,8 +384,9 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChanged(cmd *PolicyChangedComm

// first bool is true if the policy still matches, false otherwise
// second bool is true unless a higher priority workload than the current one has been added or changed
// third bool is true if the cluster namespace is not changed, this return value should be check only when device type is cluster
// if an error occurs, both will be false
func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persistence.Agreement, oldPolicy *policy.Policy, cph ConsumerProtocolHandler) (bool, bool) {
func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persistence.Agreement, oldPolicy *policy.Policy, cph ConsumerProtocolHandler) (bool, bool, bool) {
if glog.V(5) {
glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("attempting to update agreement %v due to change in policy", ag.CurrentAgreementId)))
}
Expand All @@ -384,7 +396,7 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
for _, svcId := range ag.ServiceId {
if svcPol, err := exchange.GetServicePolicyWithId(b, svcId); err != nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get service policy for %v from the exchange: %v", svcId, err)))
return false, false
return false, false, false
} else if svcPol != nil {
svcAllPol.MergeWith(&svcPol.ExternalPolicy, false)
}
Expand All @@ -396,23 +408,23 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
_, busPol, err := compcheck.GetBusinessPolicy(busPolHandler, ag.PolicyName, true, msgPrinter)
if err != nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get business policy %v/%v from the exchange: %v", ag.Org, ag.PolicyName, err)))
return false, false
return false, false, false
}

nodePolHandler := exchange.GetHTTPNodePolicyHandler(b)
_, nodePol, err := compcheck.GetNodePolicy(nodePolHandler, ag.DeviceId, msgPrinter)
if err != nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get node policy for %v from the exchange.", ag.DeviceId)))
return false, false
return false, false, false
}

dev, err := exchange.GetExchangeDevice(b.GetHTTPFactory(), ag.DeviceId, b.GetExchangeId(), b.GetExchangeToken(), b.GetExchangeURL())
if err != nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get node %v from the exchange.", ag.DeviceId)))
return false, false
return false, false, false
} else if dev == nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("Device %v does not exist in the exchange.", ag.DeviceId)))
return false, false
return false, false, false
}

nodeArch := dev.Arch
Expand All @@ -428,30 +440,31 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
// skip for now if not all built-in properties are in the node policy
// this will get called again after the node updates its policy with the built-ins
if !externalpolicy.ContainsAllBuiltInNodeProps(&nodePol.Properties, swVers, dev.GetNodeType()) {
return true, true
return true, true, true
}

match, reason, producerPol, consumerPol, err := compcheck.CheckPolicyCompatiblility(nodePol, busPol, &svcAllPol, nodeArch, nil)

if !match {
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("agreement %v is not longer in policy. Reason is: %v", ag.CurrentAgreementId, reason)))
return false, true
return false, true, false
}

// don't send an update if the agreement is not finalized yet
if ag.AgreementFinalizedTime == 0 {
return true, true
return true, true, true
}

// for every priority (in order highest to lowest) in the new policy with priority lower than the current wl
// if it's not in the old policy, cancel
choice := -1

nextPriority := policy.GetNextWorkloadChoice(busPol.Workloads, choice)
wl := nextPriority

wlUsage, err := b.db.FindSingleWorkloadUsageByDeviceAndPolicyName(ag.DeviceId, ag.PolicyName)
if err != nil {
return false, false
return false, false, false
}
// wlUsage is nil if no prioriy is set in the previous policy
wlUsagePriority := 0
Expand All @@ -462,7 +475,7 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
if currentWL := policy.GetWorkloadWithPriority(busPol.Workloads, wlUsagePriority); currentWL == nil {
// the current workload priority is no longer in the deployment policy
glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("current workload priority %v is no longer in policy for agreement %v", wlUsagePriority, ag.CurrentAgreementId)))
return true, false
return true, false, false
} else {
wl = currentWL
}
Expand All @@ -473,10 +486,22 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
matchingWL := policy.GetWorkloadWithPriority(oldPolicy.Workloads, choice)
if matchingWL == nil || !matchingWL.IsSame(*nextPriority) {
glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("Higher priority version added or modified. Cancelling agreement %v", ag.CurrentAgreementId)))
return true, false
return true, false, false
}
nextPriority = policy.GetNextWorkloadChoice(busPol.Workloads, choice)
}

// check if cluster namespace is changed in new policy
if dev.NodeType == persistence.DEVICE_TYPE_CLUSTER && busPol.ClusterNamespace != oldPolicy.ClusterNamespace {
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("cluster namespace is changed from %v to %v in busiess policy for agreement %v, checking cluster namespace compatibility ...", oldPolicy.ClusterNamespace, busPol.ClusterNamespace, ag.CurrentAgreementId)))
t_comp, consumerNamespace, t_reason := compcheck.CheckClusterNamespaceCompatibility(dev.NodeType, dev.ClusterNamespace, dev.IsNamespaceScoped, busPol.ClusterNamespace, wl.ClusterDeployment, ag.Pattern, false, msgPrinter)
if !t_comp {
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("cluster namespace %v is not longer compatible for agreement %v. Reason is: %v", consumerNamespace, ag.CurrentAgreementId, t_reason)))

}
// new cluster namespace is still compatible
return true, true, false
}
}

if wl.Arch == "" || wl.Arch == "*" {
Expand All @@ -486,10 +511,10 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
// populate the workload with the deployment string
if svcDef, _, err := exchange.GetHTTPServiceHandler(b)(wl.WorkloadURL, wl.Org, wl.Version, wl.Arch); err != nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("error getting service '%v' from the exchange, error: %v", wl, err)))
return false, false
return false, false, false
} else if svcDef == nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("Service %v not found in the exchange.", wl)))
return false, false
return false, false, false
} else {
if dev.NodeType == persistence.DEVICE_TYPE_CLUSTER {
wl.ClusterDeployment = svcDef.GetClusterDeploymentString()
Expand All @@ -503,14 +528,14 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
newTsCs, err := policy.Create_Terms_And_Conditions(producerPol, consumerPol, wl, ag.CurrentAgreementId, b.config.AgreementBot.DefaultWorkloadPW, b.config.AgreementBot.NoDataIntervalS, basicprotocol.PROTOCOL_CURRENT_VERSION)
if err != nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("error creating new terms and conditions: %v", err)))
return false, false
return false, false, false
}

ag.LastPolicyUpdateTime = uint64(time.Now().Unix())

b.UpdateAgreement(&ag, basicprotocol.MsgUpdateTypePolicyChange, newTsCs, cph)

return true, true
return true, true, true
}

func (b *BaseConsumerProtocolHandler) HandlePolicyDeleted(cmd *PolicyDeletedCommand, cph ConsumerProtocolHandler) {
Expand Down Expand Up @@ -581,7 +606,7 @@ func (b *BaseConsumerProtocolHandler) HandleServicePolicyChanged(cmd *ServicePol
if agreements, err := b.db.FindAgreements([]persistence.AFilter{persistence.UnarchivedAFilter(), InProgress()}, cph.Name()); err == nil {
for _, ag := range agreements {
if ag.Pattern == "" && ag.PolicyName == fmt.Sprintf("%v/%v", cmd.Msg.BusinessPolOrg, cmd.Msg.BusinessPolName) && ag.ServiceId[0] == cmd.Msg.ServiceId {
policyMatches, noNewPriority := b.HandlePolicyChangeForAgreement(ag, nil, cph)
policyMatches, noNewPriority, _ := b.HandlePolicyChangeForAgreement(ag, nil, cph)
agStillValid := policyMatches && noNewPriority
if !agStillValid {
glog.Warningf(BCPHlogstring(b.Name(), fmt.Sprintf("agreement %v has a service policy %v that has changed.", ag.CurrentAgreementId, ag.ServiceId)))
Expand All @@ -606,7 +631,7 @@ func (b *BaseConsumerProtocolHandler) HandleNodePolicyChanged(cmd *NodePolicyCha
if agreements, err := b.db.FindAgreements([]persistence.AFilter{persistence.UnarchivedAFilter(), InProgress()}, cph.Name()); err == nil {
for _, ag := range agreements {
if ag.Pattern == "" && ag.DeviceId == cutil.FormOrgSpecUrl(cmd.Msg.NodeId, cmd.Msg.NodePolOrg) {
policyMatches, noNewPriority := b.HandlePolicyChangeForAgreement(ag, nil, cph)
policyMatches, noNewPriority, _ := b.HandlePolicyChangeForAgreement(ag, nil, cph)
agStillValid := policyMatches && noNewPriority
if !agStillValid {
glog.Warningf(BCPHlogstring(b.Name(), fmt.Sprintf("agreement %v has a node policy %v that has changed.", ag.CurrentAgreementId, ag.ServiceId)))
Expand Down
11 changes: 10 additions & 1 deletion api/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type HorizonDevice struct {
Name *string `json:"name,omitempty"`
NodeType *string `json:"nodeType,omitempty"`
ClusterNamespace *string `json:"clusterNamespace"`
NamespaceScoped *bool `json:"NamespaceScoped,omitempty"`
Token *string `json:"token,omitempty"`
TokenLastValidTime *uint64 `json:"token_last_valid_time,omitempty"`
TokenValid *bool `json:"token_valid,omitempty"`
Expand Down Expand Up @@ -69,6 +70,11 @@ func (h HorizonDevice) String() string {
clusterNs = *h.ClusterNamespace
}

isNS := false
if h.NamespaceScoped != nil {
isNS = *h.NamespaceScoped
}

ha_group := ""
if h.HAGroup != nil {
ha_group = *h.HAGroup
Expand All @@ -89,7 +95,7 @@ func (h HorizonDevice) String() string {
tv = *h.TokenValid
}

return fmt.Sprintf("Id: %v, Org: %v, Pattern: %v, Name: %v, NodeType: %v, ClusterNamespace: %v, HAGroup: %v, Token: [%v], TokenLastValidTime: %v, TokenValid: %v, %v", id, org, pat, name, nodeType, clusterNs, ha_group, cred, tlvt, tv, h.Config)
return fmt.Sprintf("Id: %v, Org: %v, Pattern: %v, Name: %v, NodeType: %v, ClusterNamespace: %v, NamespaceScoped: %v, HAGroup: %v, Token: [%v], TokenLastValidTime: %v, TokenValid: %v, %v", id, org, pat, name, nodeType, clusterNs, isNS, ha_group, cred, tlvt, tv, h.Config)
}

// This is a type conversion function but note that the token field within the persistent
Expand Down Expand Up @@ -121,6 +127,9 @@ func ConvertFromPersistentHorizonDevice(pDevice *persistence.ExchangeDevice) *Ho
if pDevice.NodeType == persistence.DEVICE_TYPE_CLUSTER {
ns := cutil.GetClusterNamespace()
hDevice.ClusterNamespace = &ns

isNS := cutil.IsNamespaceScoped()
hDevice.NamespaceScoped = &isNS
}

return &hDevice
Expand Down
8 changes: 8 additions & 0 deletions api/path_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,14 @@ func CreateHorizonDevice(device *HorizonDevice,
if err := patchDeviceHandler(deviceId, *device.Token, &pdr); err != nil {
return errorhandler(NewSystemError(fmt.Sprintf("error updating cluster namespace for the exchange node. %v", err))), nil, nil
}

pdr = exchange.PatchDeviceRequest{}
isNS := cutil.IsNamespaceScoped()
pdr.IsNamespaceScoped = &isNS
if err := patchDeviceHandler(deviceId, *device.Token, &pdr); err != nil {
return errorhandler(NewSystemError(fmt.Sprintf("error updating cluster agent scope for the exchange node. %v", err))), nil, nil
}

}

// Return 2 device objects, the first is the fully populated newly created device object. The second is a device
Expand Down
17 changes: 10 additions & 7 deletions api/path_node_configstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/open-horizon/anax/persistence"
"github.com/open-horizon/anax/policy"
"github.com/open-horizon/anax/semanticversion"
"os"
"strings"
)

Expand Down Expand Up @@ -358,15 +357,19 @@ func getSpecRefsForPattern(nodeType string, patName string,

glog.V(5).Infof(apiLogString(fmt.Sprintf("working with pattern definition %v", patternDef)))

nodeNamespace := os.Getenv("AGENT_NAMESPACE")
// Uncomment this section after exchange add "isNamespaceScoped" field
nodeNamespace := cutil.GetClusterNamespace()
isNamespaceScoped := cutil.IsNamespaceScoped()

if nodeType == persistence.DEVICE_TYPE_CLUSTER {
if nodeNamespace == "" {
// TODO: a better way to detect cluster agent namespace
nodeNamespace = externalpolicy.DEFAULT_NODE_K8S_NAMESPACE
}
if nodeNamespace != externalpolicy.DEFAULT_NODE_K8S_NAMESPACE {
if patternDef.ClusterNamespace != "" && patternDef.ClusterNamespace != nodeNamespace {
return nil, nil, NewSystemError(fmt.Sprintf("Pattern cluster namespace is different from agent namespace. Cluster namespace in pattern is %v, agent namespace is %v", patternDef.ClusterNamespace, nodeNamespace))
}

glog.V(5).Infof(apiLogString(fmt.Sprintf("checking cluster namespace comptibility in pattern %v", patId)))
if err := compcheck.ValidatePatternClusterNamespace(isNamespaceScoped, nodeNamespace, patternDef.ClusterNamespace, patId, nil); err != nil {
return nil, nil, NewSystemError(err.Error())
}
}

Expand Down Expand Up @@ -416,7 +419,7 @@ func getSpecRefsForPattern(nodeType string, patName string,

if nodeType == persistence.DEVICE_TYPE_CLUSTER {
// Ignore service that has namespace conflict
if compatible, _, reason := compcheck.CheckClusterNamespaceCompatibility(nodeType, nodeNamespace, patternDef.ClusterNamespace, serviceDef.ClusterDeployment, true, nil); !compatible {
if compatible, _, reason := compcheck.CheckClusterNamespaceCompatibility(nodeType, cutil.GetClusterNamespace(), cutil.IsNamespaceScoped(), patternDef.ClusterNamespace, serviceDef.ClusterDeployment, patId, true, nil); !compatible {
// warning
glog.Infof(apiLogString(fmt.Sprintf("skipping service %v/%v because %v", service.ServiceOrg, service.ServiceURL, reason)))
continue
Expand Down
Loading
Loading