Skip to content

Commit

Permalink
Issue #3814 - MultiNamespace: Agent should utilize IS_NAMESPACE_SCOPE…
Browse files Browse the repository at this point in the history
…D environment variable to indicate its scope when deploy the service

Signed-off-by: Le Zhang <[email protected]>
  • Loading branch information
LiilyZhang committed Jul 27, 2023
1 parent 4e01770 commit ce12f8a
Show file tree
Hide file tree
Showing 26 changed files with 587 additions and 156 deletions.
2 changes: 1 addition & 1 deletion agreementbot/agreementworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ func (b *BaseAgreementWorker) InitiateNewAgreement(cph ConsumerProtocolHandler,
// for cluster type and check for namespace compatibility
consumerNamespace := ""
if nodeType == persistence.DEVICE_TYPE_CLUSTER {
t_comp, consumerNamespace, t_reason = compcheck.CheckClusterNamespaceCompatibility(nodeType, exchangeDev.ClusterNamespace, wi.ConsumerPolicy.ClusterNamespace, topSvcDef.GetClusterDeployment(), false, msgPrinter)
t_comp, consumerNamespace, t_reason = compcheck.CheckClusterNamespaceCompatibility(nodeType, exchangeDev.ClusterNamespace, exchangeDev.IsNamespaceScoped, wi.ConsumerPolicy.ClusterNamespace, topSvcDef.GetClusterDeployment(), wi.ConsumerPolicy.PatternId, false, msgPrinter)
if !t_comp {
glog.Warningf(BAWlogstring(workerId, fmt.Sprintf("cannot make agreement with node %v for service %v/%v %v. %v", wi.Device.Id, workload.Org, workload.WorkloadURL, workload.Version, t_reason)))
return
Expand Down
80 changes: 59 additions & 21 deletions agreementbot/consumer_protocol_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,13 +319,24 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChanged(cmd *PolicyChangedComm
}
continue
} else if err := b.pm.MatchesMine(cmd.Msg.Org(), pol); err != nil {
if glog.V(5) {
glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("cmd msg org matches mine for agreement %v", ag.CurrentAgreementId)))
}
agStillValid := false
policyMatches := true
noNewPriority := false
clusterNSNotChange := true

if ag.Pattern == "" {
policyMatches, noNewPriority = b.HandlePolicyChangeForAgreement(ag, cmd.Msg.OldPolicy(), cph)
policyMatches, noNewPriority, clusterNSNotChange = b.HandlePolicyChangeForAgreement(ag, cmd.Msg.OldPolicy(), cph)
agStillValid = policyMatches && noNewPriority
if ag.GetDeviceType() == persistence.DEVICE_TYPE_CLUSTER {
agStillValid = agStillValid && clusterNSNotChange
}
}

if glog.V(5) {
glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("for current agreement %v: agStillValid: %v, policyMatches: %v, noNewPriority: %v, clusterNSNotChange: %v", ag.CurrentAgreementId, agStillValid, policyMatches, noNewPriority, clusterNSNotChange)))
}

if !agStillValid {
Expand Down Expand Up @@ -373,8 +384,9 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChanged(cmd *PolicyChangedComm

// first bool is true if the policy still matches, false otherwise
// second bool is true unless a higher priority workload than the current one has been added or changed
// third bool is true if the cluster namespace is not changed, this return value should be check only when device type is cluster
// if an error occurs, both will be false
func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persistence.Agreement, oldPolicy *policy.Policy, cph ConsumerProtocolHandler) (bool, bool) {
func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persistence.Agreement, oldPolicy *policy.Policy, cph ConsumerProtocolHandler) (bool, bool, bool) {
if glog.V(5) {
glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("attempting to update agreement %v due to change in policy", ag.CurrentAgreementId)))
}
Expand All @@ -384,7 +396,7 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
for _, svcId := range ag.ServiceId {
if svcPol, err := exchange.GetServicePolicyWithId(b, svcId); err != nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get service policy for %v from the exchange: %v", svcId, err)))
return false, false
return false, false, false
} else if svcPol != nil {
svcAllPol.MergeWith(&svcPol.ExternalPolicy, false)
}
Expand All @@ -396,23 +408,23 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
_, busPol, err := compcheck.GetBusinessPolicy(busPolHandler, ag.PolicyName, true, msgPrinter)
if err != nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get business policy %v/%v from the exchange: %v", ag.Org, ag.PolicyName, err)))
return false, false
return false, false, false
}

nodePolHandler := exchange.GetHTTPNodePolicyHandler(b)
_, nodePol, err := compcheck.GetNodePolicy(nodePolHandler, ag.DeviceId, msgPrinter)
if err != nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get node policy for %v from the exchange.", ag.DeviceId)))
return false, false
return false, false, false
}

dev, err := exchange.GetExchangeDevice(b.GetHTTPFactory(), ag.DeviceId, b.GetExchangeId(), b.GetExchangeToken(), b.GetExchangeURL())
if err != nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get node %v from the exchange.", ag.DeviceId)))
return false, false
return false, false, false
} else if dev == nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("Device %v does not exist in the exchange.", ag.DeviceId)))
return false, false
return false, false, false
}

nodeArch := dev.Arch
Expand All @@ -428,30 +440,35 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
// skip for now if not all built-in properties are in the node policy
// this will get called again after the node updates its policy with the built-ins
if !externalpolicy.ContainsAllBuiltInNodeProps(&nodePol.Properties, swVers, dev.GetNodeType()) {
return true, true
return true, true, true
}

match, reason, producerPol, consumerPol, err := compcheck.CheckPolicyCompatiblility(nodePol, busPol, &svcAllPol, nodeArch, nil)

if !match {
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("agreement %v is not longer in policy. Reason is: %v", ag.CurrentAgreementId, reason)))
return false, true
return false, true, false
}

// don't send an update if the agreement is not finalized yet
if ag.AgreementFinalizedTime == 0 {
return true, true
return true, true, true
}

// for every priority (in order highest to lowest) in the new policy with priority lower than the current wl
// if it's not in the old policy, cancel
choice := -1
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("for agreement %v, business policy workloads is %v", ag.CurrentAgreementId, busPol.Workloads)))
// [Priority: PriorityValue: 0, Retries: 0, RetryDurationS: 0, VerifiedDurationS: 0, Deployment: , DeploymentSignature: , DeploymentUserInfo: , Workload Password: , ClusterDeployment: , ClusterDeploymentSignature: , Workload URL: https://bluehorizon.network/services/location, Org: [email protected], Version: 2.0.7, Arch: amd64, Deployment Overrides: , Deployment Overrides Signature: ]

nextPriority := policy.GetNextWorkloadChoice(busPol.Workloads, choice)
wl := nextPriority
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("for agreement %v, nextPriority in business policy is %v", ag.CurrentAgreementId, nextPriority)))
// PriorityValue: 0, Retries: 0, RetryDurationS: 0, VerifiedDurationS: 0, Deployment: , DeploymentSignature: , DeploymentUserInfo: , Workload Password: , ClusterDeployment: , ClusterDeploymentSignature: , Workload URL: https://bluehorizon.network/services/location, Org: [email protected], Version: 2.0.7, Arch: amd64, Deployment Overrides: , Deployment Overrides Signature:

wlUsage, err := b.db.FindSingleWorkloadUsageByDeviceAndPolicyName(ag.DeviceId, ag.PolicyName)
wlUsage, err := b.db.FindSingleWorkloadUsageByDeviceAndPolicyName(ag.DeviceId, ag.PolicyName) // ag.DeviceId: userdev/an12345, ag.PolicyName: "userdev/bp_location_2.0.6
if err != nil {
return false, false
return false, false, false
}
// wlUsage is nil if no prioriy is set in the previous policy
wlUsagePriority := 0
Expand All @@ -462,20 +479,41 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
if currentWL := policy.GetWorkloadWithPriority(busPol.Workloads, wlUsagePriority); currentWL == nil {
// the current workload priority is no longer in the deployment policy
glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("current workload priority %v is no longer in policy for agreement %v", wlUsagePriority, ag.CurrentAgreementId)))
return true, false
return true, false, false
} else {
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("for agreement %v, get current wl with priority is %v", ag.CurrentAgreementId, currentWL)))
// currentWL: Priority: PriorityValue: 0, Retries: 0, RetryDurationS: 0, VerifiedDurationS: 0, Deployment: , DeploymentSignature: , DeploymentUserInfo: , Workload Password: , ClusterDeployment: , ClusterDeploymentSignature: , Workload URL: https://bluehorizon.network/services/location, Org: [email protected], Version: 2.0.7, Arch: amd64, Deployment Overrides: , Deployment Overrides Signature:
wl = currentWL
}

if oldPolicy != nil {
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("for agreement %v, oldPolicy is not null: %v", ag.CurrentAgreementId, oldPolicy))) // Name: userdev/bp_location_2.0.6 Version: 2.0, Pattern:
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("for agreement %v, wlUsagePriority: %v, nextPriority: %v", ag.CurrentAgreementId, wlUsagePriority, nextPriority)))
// wlUsagePriority: 0
// nextPriority: Priority: PriorityValue: 0, Retries: 0, RetryDurationS: 0, VerifiedDurationS: 0, Deployment: , DeploymentSignature: , DeploymentUserInfo: , Workload Password: , ClusterDeployment: , ClusterDeploymentSignature: , Workload URL: https://bluehorizon.network/services/location, Org: [email protected], Version: 2.0.7, Arch: amd64, Deployment Overrides: , Deployment Overrides Signature:

for choice <= wlUsagePriority && nextPriority != nil {
choice = nextPriority.Priority.PriorityValue
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("for agreement %v, choice <= wlUsagePriority && nextPriority != nil", ag.CurrentAgreementId)))
choice = nextPriority.Priority.PriorityValue // 0
matchingWL := policy.GetWorkloadWithPriority(oldPolicy.Workloads, choice)
if matchingWL == nil || !matchingWL.IsSame(*nextPriority) {
glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("Higher priority version added or modified. Cancelling agreement %v", ag.CurrentAgreementId)))
return true, false
return true, false, false
}
nextPriority = policy.GetNextWorkloadChoice(busPol.Workloads, choice)
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("for agreement %v inside priority loop, get next workload choice from business policy workload, choice: %v, nextPriority: %v", ag.CurrentAgreementId, choice, nextPriority)))
}

// check if cluster namespace is changed in new policy
if dev.NodeType == persistence.DEVICE_TYPE_CLUSTER && busPol.ClusterNamespace != oldPolicy.ClusterNamespace {
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("cluster namespace is changed from %v to %v in busiess policy for agreement %v, checking cluster namespace compatibility ...", oldPolicy.ClusterNamespace, busPol.ClusterNamespace, ag.CurrentAgreementId)))
t_comp, consumerNamespace, t_reason := compcheck.CheckClusterNamespaceCompatibility(dev.NodeType, dev.ClusterNamespace, dev.IsNamespaceScoped, busPol.ClusterNamespace, wl.ClusterDeployment, ag.Pattern, false, msgPrinter)
if !t_comp {
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("cluster namespace %v is not longer compatible for agreement %v. Reason is: %v", consumerNamespace, ag.CurrentAgreementId, t_reason)))

}
// new cluster namespace is still compatible
return true, true, false
}
}

Expand All @@ -486,10 +524,10 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
// populate the workload with the deployment string
if svcDef, _, err := exchange.GetHTTPServiceHandler(b)(wl.WorkloadURL, wl.Org, wl.Version, wl.Arch); err != nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("error getting service '%v' from the exchange, error: %v", wl, err)))
return false, false
return false, false, false
} else if svcDef == nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("Service %v not found in the exchange.", wl)))
return false, false
return false, false, false
} else {
if dev.NodeType == persistence.DEVICE_TYPE_CLUSTER {
wl.ClusterDeployment = svcDef.GetClusterDeploymentString()
Expand All @@ -503,14 +541,14 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
newTsCs, err := policy.Create_Terms_And_Conditions(producerPol, consumerPol, wl, ag.CurrentAgreementId, b.config.AgreementBot.DefaultWorkloadPW, b.config.AgreementBot.NoDataIntervalS, basicprotocol.PROTOCOL_CURRENT_VERSION)
if err != nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("error creating new terms and conditions: %v", err)))
return false, false
return false, false, false
}

ag.LastPolicyUpdateTime = uint64(time.Now().Unix())

b.UpdateAgreement(&ag, basicprotocol.MsgUpdateTypePolicyChange, newTsCs, cph)

return true, true
return true, true, true
}

func (b *BaseConsumerProtocolHandler) HandlePolicyDeleted(cmd *PolicyDeletedCommand, cph ConsumerProtocolHandler) {
Expand Down Expand Up @@ -581,7 +619,7 @@ func (b *BaseConsumerProtocolHandler) HandleServicePolicyChanged(cmd *ServicePol
if agreements, err := b.db.FindAgreements([]persistence.AFilter{persistence.UnarchivedAFilter(), InProgress()}, cph.Name()); err == nil {
for _, ag := range agreements {
if ag.Pattern == "" && ag.PolicyName == fmt.Sprintf("%v/%v", cmd.Msg.BusinessPolOrg, cmd.Msg.BusinessPolName) && ag.ServiceId[0] == cmd.Msg.ServiceId {
policyMatches, noNewPriority := b.HandlePolicyChangeForAgreement(ag, nil, cph)
policyMatches, noNewPriority, _ := b.HandlePolicyChangeForAgreement(ag, nil, cph)
agStillValid := policyMatches && noNewPriority
if !agStillValid {
glog.Warningf(BCPHlogstring(b.Name(), fmt.Sprintf("agreement %v has a service policy %v that has changed.", ag.CurrentAgreementId, ag.ServiceId)))
Expand All @@ -606,7 +644,7 @@ func (b *BaseConsumerProtocolHandler) HandleNodePolicyChanged(cmd *NodePolicyCha
if agreements, err := b.db.FindAgreements([]persistence.AFilter{persistence.UnarchivedAFilter(), InProgress()}, cph.Name()); err == nil {
for _, ag := range agreements {
if ag.Pattern == "" && ag.DeviceId == cutil.FormOrgSpecUrl(cmd.Msg.NodeId, cmd.Msg.NodePolOrg) {
policyMatches, noNewPriority := b.HandlePolicyChangeForAgreement(ag, nil, cph)
policyMatches, noNewPriority, _ := b.HandlePolicyChangeForAgreement(ag, nil, cph)
agStillValid := policyMatches && noNewPriority
if !agStillValid {
glog.Warningf(BCPHlogstring(b.Name(), fmt.Sprintf("agreement %v has a node policy %v that has changed.", ag.CurrentAgreementId, ag.ServiceId)))
Expand Down
11 changes: 10 additions & 1 deletion api/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type HorizonDevice struct {
Name *string `json:"name,omitempty"`
NodeType *string `json:"nodeType,omitempty"`
ClusterNamespace *string `json:"clusterNamespace"`
NamespaceScoped *bool `json:"NamespaceScoped,omitempty"`
Token *string `json:"token,omitempty"`
TokenLastValidTime *uint64 `json:"token_last_valid_time,omitempty"`
TokenValid *bool `json:"token_valid,omitempty"`
Expand Down Expand Up @@ -69,6 +70,11 @@ func (h HorizonDevice) String() string {
clusterNs = *h.ClusterNamespace
}

isNS := false
if h.NamespaceScoped != nil {
isNS = *h.NamespaceScoped
}

ha_group := ""
if h.HAGroup != nil {
ha_group = *h.HAGroup
Expand All @@ -89,7 +95,7 @@ func (h HorizonDevice) String() string {
tv = *h.TokenValid
}

return fmt.Sprintf("Id: %v, Org: %v, Pattern: %v, Name: %v, NodeType: %v, ClusterNamespace: %v, HAGroup: %v, Token: [%v], TokenLastValidTime: %v, TokenValid: %v, %v", id, org, pat, name, nodeType, clusterNs, ha_group, cred, tlvt, tv, h.Config)
return fmt.Sprintf("Id: %v, Org: %v, Pattern: %v, Name: %v, NodeType: %v, ClusterNamespace: %v, NamespaceScoped: %v, HAGroup: %v, Token: [%v], TokenLastValidTime: %v, TokenValid: %v, %v", id, org, pat, name, nodeType, clusterNs, isNS, ha_group, cred, tlvt, tv, h.Config)
}

// This is a type conversion function but note that the token field within the persistent
Expand Down Expand Up @@ -121,6 +127,9 @@ func ConvertFromPersistentHorizonDevice(pDevice *persistence.ExchangeDevice) *Ho
if pDevice.NodeType == persistence.DEVICE_TYPE_CLUSTER {
ns := cutil.GetClusterNamespace()
hDevice.ClusterNamespace = &ns

isNS := cutil.IsNamespaceScoped()
hDevice.NamespaceScoped = &isNS
}

return &hDevice
Expand Down
8 changes: 8 additions & 0 deletions api/path_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,14 @@ func CreateHorizonDevice(device *HorizonDevice,
if err := patchDeviceHandler(deviceId, *device.Token, &pdr); err != nil {
return errorhandler(NewSystemError(fmt.Sprintf("error updating cluster namespace for the exchange node. %v", err))), nil, nil
}

pdr = exchange.PatchDeviceRequest{}
isNS := cutil.IsNamespaceScoped()
pdr.IsNamespaceScoped = &isNS
if err := patchDeviceHandler(deviceId, *device.Token, &pdr); err != nil {
return errorhandler(NewSystemError(fmt.Sprintf("error updating cluster agent scope for the exchange node. %v", err))), nil, nil
}

}

// Return 2 device objects, the first is the fully populated newly created device object. The second is a device
Expand Down
30 changes: 23 additions & 7 deletions api/path_node_configstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/open-horizon/anax/persistence"
"github.com/open-horizon/anax/policy"
"github.com/open-horizon/anax/semanticversion"
"os"
"strings"
)

Expand Down Expand Up @@ -358,16 +357,33 @@ func getSpecRefsForPattern(nodeType string, patName string,

glog.V(5).Infof(apiLogString(fmt.Sprintf("working with pattern definition %v", patternDef)))

nodeNamespace := os.Getenv("AGENT_NAMESPACE")
// Uncomment this section after exchange add "isNamespaceScoped" field
nodeNamespace := cutil.GetClusterNamespace()
isNamespaceScoped := cutil.IsNamespaceScoped()

if nodeType == persistence.DEVICE_TYPE_CLUSTER {
if nodeNamespace == "" {
// TODO: a better way to detect cluster agent namespace
nodeNamespace = externalpolicy.DEFAULT_NODE_K8S_NAMESPACE
}
if nodeNamespace != externalpolicy.DEFAULT_NODE_K8S_NAMESPACE {
if patternDef.ClusterNamespace != "" && patternDef.ClusterNamespace != nodeNamespace {
return nil, nil, NewSystemError(fmt.Sprintf("Pattern cluster namespace is different from agent namespace. Cluster namespace in pattern is %v, agent namespace is %v", patternDef.ClusterNamespace, nodeNamespace))
}

glog.V(5).Infof(apiLogString(fmt.Sprintf("checking cluster namespace comptibility in pattern %v", patId)))
if err := compcheck.ValidatePatternClusterNamespace(isNamespaceScoped, nodeNamespace, patternDef.ClusterNamespace, patId, nil); err != nil {
return nil, nil, NewSystemError(err.Error())
}
/*
if isNamespaceScoped {
if patternDef.ClusterNamespace == "" {
return nil, nil, NewSystemError(fmt.Sprintf("Namespace scoped agent cannot register a pattern with empty cluster namespace"))
} else if patternDef.ClusterNamespace != "" && patternDef.ClusterNamespace != nodeNamespace {
return nil, nil, NewSystemError(fmt.Sprintf("Pattern cluster namespace is different from agent namespace. Cluster namespace in pattern is %v, agent namespace is %v", patternDef.ClusterNamespace, nodeNamespace))
}
} else {
// is cluster scope
if patternDef.ClusterNamespace != "" {
return nil, nil, NewSystemError(fmt.Sprintf("Cluster namespace in pattern is %v, only pattern with empty clsuter namespace can be registered for cluster scoped agent", patternDef.ClusterNamespace))
}
} */
}

// For each workload/top-level service in the pattern, resolve it to a list of required services.
Expand Down Expand Up @@ -416,7 +432,7 @@ func getSpecRefsForPattern(nodeType string, patName string,

if nodeType == persistence.DEVICE_TYPE_CLUSTER {
// Ignore service that has namespace conflict
if compatible, _, reason := compcheck.CheckClusterNamespaceCompatibility(nodeType, nodeNamespace, patternDef.ClusterNamespace, serviceDef.ClusterDeployment, true, nil); !compatible {
if compatible, _, reason := compcheck.CheckClusterNamespaceCompatibility(nodeType, cutil.GetClusterNamespace(), cutil.IsNamespaceScoped(), patternDef.ClusterNamespace, serviceDef.ClusterDeployment, patId, true, nil); !compatible {
// warning
glog.Infof(apiLogString(fmt.Sprintf("skipping service %v/%v because %v", service.ServiceOrg, service.ServiceURL, reason)))
continue
Expand Down
Loading

0 comments on commit ce12f8a

Please sign in to comment.