Skip to content

Commit

Permalink
Merge pull request #3884 from LiilyZhang/zhangl/secretsForCluster
Browse files Browse the repository at this point in the history
Issue 3892 - Support Service secrets for cluster agent
  • Loading branch information
LiilyZhang authored Sep 5, 2023
2 parents b94dbd7 + 997b803 commit 1ed07af
Show file tree
Hide file tree
Showing 22 changed files with 689 additions and 86 deletions.
2 changes: 1 addition & 1 deletion agreementbot/agreementworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ func (b *BaseAgreementWorker) InitiateNewAgreement(cph ConsumerProtocolHandler,

// Make sure the deployment policy or pattern has all the right secret bindings in place and extract the secret details for the agent.
secrets_match := true
if policy_match && userInput_match && nodeType == persistence.DEVICE_TYPE_DEVICE {
if policy_match && userInput_match {

err := b.ValidateAndExtractSecrets(&wi.ConsumerPolicy, wi.Device.Id, &topSvcDef, depServices, workerId, msgPrinter)
if err != nil {
Expand Down
48 changes: 47 additions & 1 deletion common/deploymentconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,52 @@ func ConvertToDeploymentConfig(deployment interface{}, msgPrinter *message.Print
return depConfig, nil
}

type ClusterDeploymentConfig struct {
Metadata map[string]interface{} `json:"metadata,omitempty"`
OperatorYamlArchive string `json:"operatorYamlArchive"`
Secrets map[string]containermessage.Secret `json:"secrets"`
}

// Take the deployment field, which we have told the json unmarshaller was unknown type (so we can handle both escaped string and struct)
// and turn it into the DeploymentConfig struct we really want.
func ConvertToClusterDeploymentConfig(clusterDeployment interface{}, msgPrinter *message.Printer) (*ClusterDeploymentConfig, error) {
// get default message printer if nil
if msgPrinter == nil {
msgPrinter = i18n.GetMessagePrinter()
}

var jsonBytes []byte
var err error

// Take whatever type the deployment field is and convert it to marshalled json bytes
switch d := clusterDeployment.(type) {
case string:
if len(d) == 0 {
return nil, nil
}
// In the original input file this was escaped json as a string, but the original unmarshal removed the escapes
jsonBytes = []byte(d)
case nil:
return nil, nil
default:
// The only other valid input is regular json in ClusterDeploymentConfig structure. Marshal it back to bytes so we can unmarshal it in a way that lets Go know it is a DeploymentConfig
jsonBytes, err = json.Marshal(d)
if err != nil {
return nil, fmt.Errorf(msgPrinter.Sprintf("failed to marshal body for %v: %v", d, err))
}
}

// Now unmarshal the bytes into the struct we have wanted all along
clusterDepConfig := new(ClusterDeploymentConfig)
err = json.Unmarshal(jsonBytes, clusterDepConfig)
if err != nil {
return nil, fmt.Errorf(msgPrinter.Sprintf("failed to unmarshal json for deployment field %s: %v", string(jsonBytes), err))
}

return clusterDepConfig, nil

}

// Get the metadata filed from the cluster deployment config
// inspectOperatorForNS: get the namespace from the operator if 'metadata' attribute is not defined.
func GetClusterDeploymentMetadata(clusterDeployment interface{}, inspectOperatorForNS bool, msgPrinter *message.Printer) (map[string]interface{}, error) {
Expand Down Expand Up @@ -175,6 +221,6 @@ func GetClusterDeploymentMetadata(clusterDeployment interface{}, inspectOperator
}

func GetKubeOperatorNamespace(tar string) (string, error) {
_, namespace, err := kube_operator.ProcessDeployment(tar, nil, map[string]string{}, "", 0)
_, namespace, err := kube_operator.ProcessDeployment(tar, nil, map[string]string{}, map[string]string{}, "", 0)
return namespace, err
}
75 changes: 45 additions & 30 deletions compcheck/secretbinding_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import (
"fmt"
"github.com/open-horizon/anax/businesspolicy"
"github.com/open-horizon/anax/common"
"github.com/open-horizon/anax/containermessage"
"github.com/open-horizon/anax/cutil"
"github.com/open-horizon/anax/exchange"
"github.com/open-horizon/anax/exchangecommon"
"github.com/open-horizon/anax/i18n"
"github.com/open-horizon/anax/persistence"
"github.com/open-horizon/anax/semanticversion"
"golang.org/x/text/message"
"strings"
Expand Down Expand Up @@ -151,8 +151,6 @@ func secretBindingCompatible(getDeviceHandler exchange.DeviceHandler,
// node id or from the input.
if nodeType, err := VerifyNodeType(input.NodeType, resources.NodeType, nodeId, msgPrinter); err != nil {
return nil, err
} else if nodeType == persistence.DEVICE_TYPE_CLUSTER {
return nil, NewCompCheckError(fmt.Errorf(msgPrinter.Sprintf("Node type '%v' does not support secret binding check.", nodeType)), COMPCHECK_INPUT_ERROR)
} else {
resources.NodeType = nodeType
}
Expand Down Expand Up @@ -583,17 +581,15 @@ func ValidateSecretBindingForSingleService(secretBinding []exchangecommon.Secret
return index, nil, err
}

// cluster type does not have secrets
var dConfig *common.DeploymentConfig
var cdConfig *common.ClusterDeploymentConfig
// convert the deployment string into object
if sdef.GetServiceType() == exchangecommon.SERVICE_TYPE_CLUSTER {
if index == -1 {
return index, nil, nil
} else {
return index, nil, fmt.Errorf(msgPrinter.Sprintf("Secret binding for a cluster service is not supported."))
}
cdConfig, err = common.ConvertToClusterDeploymentConfig(sdef.GetClusterDeployment(), msgPrinter)
} else {
dConfig, err = common.ConvertToDeploymentConfig(sdef.GetDeployment(), msgPrinter)
}

// convert the deployment string into object
dConfig, err := common.ConvertToDeploymentConfig(sdef.GetDeployment(), msgPrinter)
if err != nil {
return index, nil, err
}
Expand All @@ -607,26 +603,16 @@ func ValidateSecretBindingForSingleService(secretBinding []exchangecommon.Secret
noBinding := map[string]bool{}
if dConfig != nil {
for _, svcConf := range dConfig.Services {
for sn, _ := range svcConf.Secrets {
found := false
if index != -1 {
for _, vbind := range secretBinding[index].Secrets {
key, vs := vbind.GetBinding()
if sn == key {
found = true
if _, _, _, err := ParseVaultSecretName(vs, msgPrinter); err != nil {
return index, nil, err
}
sbNeeded[sn] = true
break
}
}
}
if !found {
noBinding[sn] = true
}
}
sbNeeded, noBinding, err = checkSecretsInDeploymentConfig(secretBinding, svcConf.Secrets, index, msgPrinter)

}
} else if cdConfig != nil {

sbNeeded, noBinding, err = checkSecretsInDeploymentConfig(secretBinding, cdConfig.Secrets, index, msgPrinter)
}

if err != nil {
return index, nil, err
}

if len(noBinding) > 0 {
Expand All @@ -649,6 +635,35 @@ func ValidateSecretBindingForSingleService(secretBinding []exchangecommon.Secret
return index, used_sb, nil
}

func checkSecretsInDeploymentConfig(secretBinding []exchangecommon.SecretBinding, Secrets map[string]containermessage.Secret, index int, msgPrinter *message.Printer) (map[string]bool, map[string]bool, error) {
sbNeeded := map[string]bool{}

// make sure each service secret has a binding
noBinding := map[string]bool{}

for sn, _ := range Secrets {
found := false
if index != -1 {
for _, vbind := range secretBinding[index].Secrets {
key, vs := vbind.GetBinding()
if sn == key {
found = true
if _, _, _, err := ParseVaultSecretName(vs, msgPrinter); err != nil {
return map[string]bool{}, map[string]bool{}, err
}
sbNeeded[sn] = true
break
}
}
}
if !found {
noBinding[sn] = true
}
}

return sbNeeded, noBinding, nil
}

// Given a list of SecretBinding's for multiples services, return index for
// the secret binding object in the given array that will be used by the given service.
// -1 means no secret binding defined for the given service
Expand Down
2 changes: 1 addition & 1 deletion container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ func CreateCLIContainerWorker(config *config.HorizonConfig) (*ContainerWorker, e
client: client,
iptables: nil,
authMgr: resource.NewAuthenticationManager(config.GetFileSyncServiceAuthPath()),
secretMgr: resource.NewSecretsManager(config.GetSecretsManagerFilePath(), nil),
secretMgr: resource.NewSecretsManager(config, nil),
pattern: "",
isDevInstance: true,
apiServerType: svType,
Expand Down
42 changes: 41 additions & 1 deletion events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ const (
CHANGE_HA_GROUP EventId = "EXCHANGE_CHANGE_HA_GROUP"

// Secret related
UPDATED_SECRETS EventId = "SECRET_UPDATES"
UPDATED_SECRETS EventId = "SECRET_UPDATES"
UPDATE_SECRETS_IN_AGREEMENT EventId = "AGREEMENT_UPDATE_SECRETS"

// ESS related
ESS_UNCONFIG EventId = "ESS_UNCONFIG"
Expand Down Expand Up @@ -2176,6 +2177,45 @@ func NewSecretUpdatesMessage(id EventId, sus *SecretUpdates) *SecretUpdatesMessa
}
}

type WorkloadUpdateMessage struct {
event Event
//Message WorkloadAgreementUpdateMessage
AgreementProtocol string
AgreementId string
ClusterNamespaceInAgreement string
Deployment persistence.DeploymentConfig
SecretsUpdate []persistence.PersistedServiceSecret
}

func (w *WorkloadUpdateMessage) Event() Event {
return w.event
}

func (w *WorkloadUpdateMessage) String() string {
depStr := ""
if w.Deployment != nil {
depStr = w.Deployment.ToString()
}
return fmt.Sprintf("event: %v, AgreementProtocol: %v, agreementId: %v, clusterNamespaceInAgreement: %v, clusterDeployment: %v, secretsUpdate: %v", w.event, w.AgreementProtocol, w.AgreementId, w.ClusterNamespaceInAgreement, depStr, w.SecretsUpdate)
}

func (w *WorkloadUpdateMessage) ShortString() string {
return w.String()
}

func NewWorkloadUpdateMessage(id EventId, agreementId string, protocol string, clusterNamespaceInAgreement string, deployment persistence.DeploymentConfig, secretsUpdate []persistence.PersistedServiceSecret) *WorkloadUpdateMessage {
return &WorkloadUpdateMessage{
event: Event{
Id: id,
},
AgreementId: agreementId,
AgreementProtocol: protocol,
ClusterNamespaceInAgreement: clusterNamespaceInAgreement,
Deployment: deployment,
SecretsUpdate: secretsUpdate,
}
}

type NMPStartDownloadMessage struct {
event Event
Message StartDownloadMessage
Expand Down
24 changes: 18 additions & 6 deletions governance/governance.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/boltdb/bolt"
"github.com/golang/glog"
"github.com/open-horizon/anax/abstractprotocol"
"github.com/open-horizon/anax/basicprotocol"
"github.com/open-horizon/anax/cache"
"github.com/open-horizon/anax/config"
"github.com/open-horizon/anax/cutil"
Expand Down Expand Up @@ -1106,7 +1107,7 @@ func (w *GovernanceWorker) CommandHandler(command worker.Command) bool {
} else {

// Allow the message extension handler to see the message
handled, cancel, agid, err := w.producerPH[msgProtocol].HandleExtensionMessages(&cmd.Msg, exchangeMsg)
handled, cancel, agid, updatedSecs, err := w.producerPH[msgProtocol].HandleExtensionMessages(&cmd.Msg, exchangeMsg)
if err != nil {
glog.Errorf(logString(fmt.Sprintf("unable to handle message %v , error: %v", protocolMsg, err)))
} else if cancel {
Expand Down Expand Up @@ -1156,6 +1157,15 @@ func (w *GovernanceWorker) CommandHandler(command worker.Command) bool {
if err != nil {
glog.Errorf(logString(fmt.Sprintf("encountered error updating agreement %v, error %v", ags[0].CurrentAgreementId, err)))
}

if len(updatedSecs) != 0 {
clusterNamespaceInAg, err := w.GetRequestedClusterNamespaceFromAg(&ags[0])
if err != nil {
glog.Errorf(logString(fmt.Sprintf("Failed to get cluster namespace from agreeent %v. %v", ags[0].CurrentAgreementId, err)))
}
// have updatedSecs, send out an event to let kube worker know about the secret update
w.Messages() <- events.NewWorkloadUpdateMessage(events.UPDATE_SECRETS_IN_AGREEMENT, agid, msgProtocol, clusterNamespaceInAg, ags[0].GetDeploymentConfig(), updatedSecs)
}
}
}

Expand Down Expand Up @@ -1261,7 +1271,7 @@ func (w *GovernanceWorker) CommandHandler(command worker.Command) bool {
if agreement, err := persistence.AgreementStateAgreementProtocolTerminated(w.db, cmd.AgreementId, cmd.AgreementProtocol); err != nil {
glog.Errorf(logString(fmt.Sprintf("error marking agreement %v agreement protocol terminated: %v", cmd.AgreementId, err)))
} else {
if agreement.WorkloadTerminatedTime != 0 {
if agreement.TerminatedReason == basicprotocol.CANCEL_NOT_EXECUTED_TIMEOUT || agreement.WorkloadTerminatedTime != 0 { //service timeout, this field is 0
archive = true
}
}
Expand Down Expand Up @@ -1632,10 +1642,12 @@ func (w *GovernanceWorker) RecordReply(proposal abstractprotocol.Proposal, proto

lc.EnvironmentAdditions = &envAdds

if err := w.processServiceSecrets(tcPolicy, proposal.AgreementId()); err != nil {
return err
}

if w.deviceType == persistence.DEVICE_TYPE_DEVICE {
if err := w.processServiceSecrets(tcPolicy, proposal.AgreementId()); err != nil {
return err
}

// Make a list of service dependencies for this workload. For sevices, it is just the top level dependencies.
deps := serviceDef.GetServiceDependencies()

Expand Down Expand Up @@ -1686,7 +1698,7 @@ func (w *GovernanceWorker) RecordReply(proposal abstractprotocol.Proposal, proto

// Save the secrets by agreement id since we don't have an instance id for the services yet
func (w *GovernanceWorker) processServiceSecrets(tcPolicy *policy.Policy, agId string) error {
glog.V(5).Infof(logString(fmt.Sprintf("process service secrets for agreement: %v", agId)))
glog.V(3).Infof(logString(fmt.Sprintf("process service secrets for agreement: %v, tcPolicy.SecretDetails: %v", agId, tcPolicy.SecretDetails)))

allSecrets := persistence.PersistedSecretFromPolicySecret(tcPolicy.SecretDetails, agId)

Expand Down
Loading

0 comments on commit 1ed07af

Please sign in to comment.