-
Notifications
You must be signed in to change notification settings - Fork 98
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Issue #3814 - MultiNamespace: Agent should utilize IS_NAMESPACE_SCOPE…
…D environment variable to indicate its scope when deploy the service Signed-off-by: Le Zhang <[email protected]>
- Loading branch information
1 parent
4e01770
commit 1d0ffda
Showing
26 changed files
with
561 additions
and
158 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -319,13 +319,24 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChanged(cmd *PolicyChangedComm | |
} | ||
continue | ||
} else if err := b.pm.MatchesMine(cmd.Msg.Org(), pol); err != nil { | ||
if glog.V(5) { | ||
glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("cmd msg org matches mine for agreement %v", ag.CurrentAgreementId))) | ||
} | ||
agStillValid := false | ||
policyMatches := true | ||
noNewPriority := false | ||
clusterNSNotChange := true | ||
|
||
if ag.Pattern == "" { | ||
policyMatches, noNewPriority = b.HandlePolicyChangeForAgreement(ag, cmd.Msg.OldPolicy(), cph) | ||
policyMatches, noNewPriority, clusterNSNotChange = b.HandlePolicyChangeForAgreement(ag, cmd.Msg.OldPolicy(), cph) | ||
agStillValid = policyMatches && noNewPriority | ||
if ag.GetDeviceType() == persistence.DEVICE_TYPE_CLUSTER { | ||
agStillValid = agStillValid && clusterNSNotChange | ||
} | ||
} | ||
|
||
if glog.V(5) { | ||
glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("for current agreement %v: agStillValid: %v, policyMatches: %v, noNewPriority: %v, clusterNSNotChange: %v", ag.CurrentAgreementId, agStillValid, policyMatches, noNewPriority, clusterNSNotChange))) | ||
} | ||
|
||
if !agStillValid { | ||
|
@@ -373,8 +384,9 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChanged(cmd *PolicyChangedComm | |
|
||
// first bool is true if the policy still matches, false otherwise | ||
// second bool is true unless a higher priority workload than the current one has been added or changed | ||
// third bool is true if the cluster namespace is not changed, this return value should be check only when device type is cluster | ||
// if an error occurs, both will be false | ||
func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persistence.Agreement, oldPolicy *policy.Policy, cph ConsumerProtocolHandler) (bool, bool) { | ||
func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persistence.Agreement, oldPolicy *policy.Policy, cph ConsumerProtocolHandler) (bool, bool, bool) { | ||
if glog.V(5) { | ||
glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("attempting to update agreement %v due to change in policy", ag.CurrentAgreementId))) | ||
} | ||
|
@@ -384,7 +396,7 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste | |
for _, svcId := range ag.ServiceId { | ||
if svcPol, err := exchange.GetServicePolicyWithId(b, svcId); err != nil { | ||
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get service policy for %v from the exchange: %v", svcId, err))) | ||
return false, false | ||
return false, false, false | ||
} else if svcPol != nil { | ||
svcAllPol.MergeWith(&svcPol.ExternalPolicy, false) | ||
} | ||
|
@@ -396,23 +408,23 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste | |
_, busPol, err := compcheck.GetBusinessPolicy(busPolHandler, ag.PolicyName, true, msgPrinter) | ||
if err != nil { | ||
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get business policy %v/%v from the exchange: %v", ag.Org, ag.PolicyName, err))) | ||
return false, false | ||
return false, false, false | ||
} | ||
|
||
nodePolHandler := exchange.GetHTTPNodePolicyHandler(b) | ||
_, nodePol, err := compcheck.GetNodePolicy(nodePolHandler, ag.DeviceId, msgPrinter) | ||
if err != nil { | ||
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get node policy for %v from the exchange.", ag.DeviceId))) | ||
return false, false | ||
return false, false, false | ||
} | ||
|
||
dev, err := exchange.GetExchangeDevice(b.GetHTTPFactory(), ag.DeviceId, b.GetExchangeId(), b.GetExchangeToken(), b.GetExchangeURL()) | ||
if err != nil { | ||
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get node %v from the exchange.", ag.DeviceId))) | ||
return false, false | ||
return false, false, false | ||
} else if dev == nil { | ||
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("Device %v does not exist in the exchange.", ag.DeviceId))) | ||
return false, false | ||
return false, false, false | ||
} | ||
|
||
nodeArch := dev.Arch | ||
|
@@ -428,30 +440,35 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste | |
// skip for now if not all built-in properties are in the node policy | ||
// this will get called again after the node updates its policy with the built-ins | ||
if !externalpolicy.ContainsAllBuiltInNodeProps(&nodePol.Properties, swVers, dev.GetNodeType()) { | ||
return true, true | ||
return true, true, true | ||
} | ||
|
||
match, reason, producerPol, consumerPol, err := compcheck.CheckPolicyCompatiblility(nodePol, busPol, &svcAllPol, nodeArch, nil) | ||
|
||
if !match { | ||
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("agreement %v is not longer in policy. Reason is: %v", ag.CurrentAgreementId, reason))) | ||
return false, true | ||
return false, true, false | ||
} | ||
|
||
// don't send an update if the agreement is not finalized yet | ||
if ag.AgreementFinalizedTime == 0 { | ||
return true, true | ||
return true, true, true | ||
} | ||
|
||
// for every priority (in order highest to lowest) in the new policy with priority lower than the current wl | ||
// if it's not in the old policy, cancel | ||
choice := -1 | ||
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("for agreement %v, business policy workloads is %v", ag.CurrentAgreementId, busPol.Workloads))) | ||
// [Priority: PriorityValue: 0, Retries: 0, RetryDurationS: 0, VerifiedDurationS: 0, Deployment: , DeploymentSignature: , DeploymentUserInfo: , Workload Password: , ClusterDeployment: , ClusterDeploymentSignature: , Workload URL: https://bluehorizon.network/services/location, Org: [email protected], Version: 2.0.7, Arch: amd64, Deployment Overrides: , Deployment Overrides Signature: ] | ||
|
||
nextPriority := policy.GetNextWorkloadChoice(busPol.Workloads, choice) | ||
wl := nextPriority | ||
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("for agreement %v, nextPriority in business policy is %v", ag.CurrentAgreementId, nextPriority))) | ||
// PriorityValue: 0, Retries: 0, RetryDurationS: 0, VerifiedDurationS: 0, Deployment: , DeploymentSignature: , DeploymentUserInfo: , Workload Password: , ClusterDeployment: , ClusterDeploymentSignature: , Workload URL: https://bluehorizon.network/services/location, Org: [email protected], Version: 2.0.7, Arch: amd64, Deployment Overrides: , Deployment Overrides Signature: | ||
|
||
wlUsage, err := b.db.FindSingleWorkloadUsageByDeviceAndPolicyName(ag.DeviceId, ag.PolicyName) | ||
wlUsage, err := b.db.FindSingleWorkloadUsageByDeviceAndPolicyName(ag.DeviceId, ag.PolicyName) // ag.DeviceId: userdev/an12345, ag.PolicyName: "userdev/bp_location_2.0.6 | ||
if err != nil { | ||
return false, false | ||
return false, false, false | ||
} | ||
// wlUsage is nil if no prioriy is set in the previous policy | ||
wlUsagePriority := 0 | ||
|
@@ -462,20 +479,41 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste | |
if currentWL := policy.GetWorkloadWithPriority(busPol.Workloads, wlUsagePriority); currentWL == nil { | ||
// the current workload priority is no longer in the deployment policy | ||
glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("current workload priority %v is no longer in policy for agreement %v", wlUsagePriority, ag.CurrentAgreementId))) | ||
return true, false | ||
return true, false, false | ||
} else { | ||
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("for agreement %v, get current wl with priority is %v", ag.CurrentAgreementId, currentWL))) | ||
// currentWL: Priority: PriorityValue: 0, Retries: 0, RetryDurationS: 0, VerifiedDurationS: 0, Deployment: , DeploymentSignature: , DeploymentUserInfo: , Workload Password: , ClusterDeployment: , ClusterDeploymentSignature: , Workload URL: https://bluehorizon.network/services/location, Org: [email protected], Version: 2.0.7, Arch: amd64, Deployment Overrides: , Deployment Overrides Signature: | ||
wl = currentWL | ||
} | ||
|
||
if oldPolicy != nil { | ||
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("for agreement %v, oldPolicy is not null: %v", ag.CurrentAgreementId, oldPolicy))) // Name: userdev/bp_location_2.0.6 Version: 2.0, Pattern: | ||
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("for agreement %v, wlUsagePriority: %v, nextPriority: %v", ag.CurrentAgreementId, wlUsagePriority, nextPriority))) | ||
// wlUsagePriority: 0 | ||
// nextPriority: Priority: PriorityValue: 0, Retries: 0, RetryDurationS: 0, VerifiedDurationS: 0, Deployment: , DeploymentSignature: , DeploymentUserInfo: , Workload Password: , ClusterDeployment: , ClusterDeploymentSignature: , Workload URL: https://bluehorizon.network/services/location, Org: [email protected], Version: 2.0.7, Arch: amd64, Deployment Overrides: , Deployment Overrides Signature: | ||
|
||
for choice <= wlUsagePriority && nextPriority != nil { | ||
choice = nextPriority.Priority.PriorityValue | ||
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("for agreement %v, choice <= wlUsagePriority && nextPriority != nil", ag.CurrentAgreementId))) | ||
choice = nextPriority.Priority.PriorityValue // 0 | ||
matchingWL := policy.GetWorkloadWithPriority(oldPolicy.Workloads, choice) | ||
if matchingWL == nil || !matchingWL.IsSame(*nextPriority) { | ||
glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("Higher priority version added or modified. Cancelling agreement %v", ag.CurrentAgreementId))) | ||
return true, false | ||
return true, false, false | ||
} | ||
nextPriority = policy.GetNextWorkloadChoice(busPol.Workloads, choice) | ||
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("for agreement %v inside priority loop, get next workload choice from business policy workload, choice: %v, nextPriority: %v", ag.CurrentAgreementId, choice, nextPriority))) | ||
} | ||
|
||
// check if cluster namespace is changed in new policy | ||
if dev.NodeType == persistence.DEVICE_TYPE_CLUSTER && busPol.ClusterNamespace != oldPolicy.ClusterNamespace { | ||
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("cluster namespace is changed from %v to %v in busiess policy for agreement %v, checking cluster namespace compatibility ...", oldPolicy.ClusterNamespace, busPol.ClusterNamespace, ag.CurrentAgreementId))) | ||
t_comp, consumerNamespace, t_reason := compcheck.CheckClusterNamespaceCompatibility(dev.NodeType, dev.ClusterNamespace, dev.IsNamespaceScoped, busPol.ClusterNamespace, wl.ClusterDeployment, ag.Pattern, false, msgPrinter) | ||
if !t_comp { | ||
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("cluster namespace %v is not longer compatible for agreement %v. Reason is: %v", consumerNamespace, ag.CurrentAgreementId, t_reason))) | ||
|
||
} | ||
// new cluster namespace is still compatible | ||
return true, true, false | ||
} | ||
} | ||
|
||
|
@@ -486,10 +524,10 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste | |
// populate the workload with the deployment string | ||
if svcDef, _, err := exchange.GetHTTPServiceHandler(b)(wl.WorkloadURL, wl.Org, wl.Version, wl.Arch); err != nil { | ||
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("error getting service '%v' from the exchange, error: %v", wl, err))) | ||
return false, false | ||
return false, false, false | ||
} else if svcDef == nil { | ||
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("Service %v not found in the exchange.", wl))) | ||
return false, false | ||
return false, false, false | ||
} else { | ||
if dev.NodeType == persistence.DEVICE_TYPE_CLUSTER { | ||
wl.ClusterDeployment = svcDef.GetClusterDeploymentString() | ||
|
@@ -503,14 +541,14 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste | |
newTsCs, err := policy.Create_Terms_And_Conditions(producerPol, consumerPol, wl, ag.CurrentAgreementId, b.config.AgreementBot.DefaultWorkloadPW, b.config.AgreementBot.NoDataIntervalS, basicprotocol.PROTOCOL_CURRENT_VERSION) | ||
if err != nil { | ||
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("error creating new terms and conditions: %v", err))) | ||
return false, false | ||
return false, false, false | ||
} | ||
|
||
ag.LastPolicyUpdateTime = uint64(time.Now().Unix()) | ||
|
||
b.UpdateAgreement(&ag, basicprotocol.MsgUpdateTypePolicyChange, newTsCs, cph) | ||
|
||
return true, true | ||
return true, true, true | ||
} | ||
|
||
func (b *BaseConsumerProtocolHandler) HandlePolicyDeleted(cmd *PolicyDeletedCommand, cph ConsumerProtocolHandler) { | ||
|
@@ -581,7 +619,7 @@ func (b *BaseConsumerProtocolHandler) HandleServicePolicyChanged(cmd *ServicePol | |
if agreements, err := b.db.FindAgreements([]persistence.AFilter{persistence.UnarchivedAFilter(), InProgress()}, cph.Name()); err == nil { | ||
for _, ag := range agreements { | ||
if ag.Pattern == "" && ag.PolicyName == fmt.Sprintf("%v/%v", cmd.Msg.BusinessPolOrg, cmd.Msg.BusinessPolName) && ag.ServiceId[0] == cmd.Msg.ServiceId { | ||
policyMatches, noNewPriority := b.HandlePolicyChangeForAgreement(ag, nil, cph) | ||
policyMatches, noNewPriority, _ := b.HandlePolicyChangeForAgreement(ag, nil, cph) | ||
agStillValid := policyMatches && noNewPriority | ||
if !agStillValid { | ||
glog.Warningf(BCPHlogstring(b.Name(), fmt.Sprintf("agreement %v has a service policy %v that has changed.", ag.CurrentAgreementId, ag.ServiceId))) | ||
|
@@ -606,7 +644,7 @@ func (b *BaseConsumerProtocolHandler) HandleNodePolicyChanged(cmd *NodePolicyCha | |
if agreements, err := b.db.FindAgreements([]persistence.AFilter{persistence.UnarchivedAFilter(), InProgress()}, cph.Name()); err == nil { | ||
for _, ag := range agreements { | ||
if ag.Pattern == "" && ag.DeviceId == cutil.FormOrgSpecUrl(cmd.Msg.NodeId, cmd.Msg.NodePolOrg) { | ||
policyMatches, noNewPriority := b.HandlePolicyChangeForAgreement(ag, nil, cph) | ||
policyMatches, noNewPriority, _ := b.HandlePolicyChangeForAgreement(ag, nil, cph) | ||
agStillValid := policyMatches && noNewPriority | ||
if !agStillValid { | ||
glog.Warningf(BCPHlogstring(b.Name(), fmt.Sprintf("agreement %v has a node policy %v that has changed.", ag.CurrentAgreementId, ag.ServiceId))) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.