Skip to content

Commit

Permalink
Merge pull request #4016 from LiilyZhang/zhangl/issue3428
Browse files Browse the repository at this point in the history
Issue #3428 - MMSinCluster: enable MMS code in cluster ty…
  • Loading branch information
LiilyZhang authored Apr 19, 2024
2 parents 5737c30 + 762071b commit 6c81194
Show file tree
Hide file tree
Showing 17 changed files with 418 additions and 111 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ endif

realclean: i18n-clean clean

mostlyclean: anax-container-clean agbot-container-clean anax-k8s-clean css-clean ess-clean
mostlyclean: anax-container-clean agbot-container-clean anax-k8s-clean auto-upgrade-cronjob-k8s-clean css-clean ess-clean
@echo "Mostlyclean"
rm -f $(EXECUTABLE) $(CLI_EXECUTABLE) $(CSS_EXECUTABLE) $(ESS_EXECUTABLE) $(CLI_CONFIG_FILE)
rm -Rf vendor
Expand Down
9 changes: 8 additions & 1 deletion agent-install/agent-install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ CRONJOB_AUTO_UPGRADE_NAME="auto-upgrade-cronjob"
IMAGE_REGISTRY_SECRET_NAME="openhorizon-agent-secrets-docker-cert"
CONFIGMAP_NAME="openhorizon-agent-config"
PVC_NAME="openhorizon-agent-pvc"
DEFAULT_PVC_SIZE="10Gi"
GET_RESOURCE_MAX_TRY=5
POD_ID=""
HZN_ENV_FILE="/tmp/agent-install-horizon-env"
Expand Down Expand Up @@ -158,6 +159,7 @@ Additional Edge Cluster Variables (in environment or config file):
EDGE_CLUSTER_REGISTRY_USERNAME: specify this value if the edge cluster registry requires authentication
EDGE_CLUSTER_REGISTRY_TOKEN: specify this value if the edge cluster registry requires authentication
EDGE_CLUSTER_STORAGE_CLASS: the storage class to use for the agent and edge services. Default: gp2
EDGE_CLUSTER_PVC_SIZE: the requested size in the agent persistent volume to use for the agent. Default: 10Gi
AGENT_NAMESPACE: The namespace the agent should run in. Default: openhorizon-agent
AGENT_WAIT_MAX_SECONDS: Maximum seconds to wait for the Horizon agent to start or stop. Default: 30
AGENT_DEPLOYMENT_STATUS_TIMEOUT_SECONDS: Maximum seconds to wait for the agent deployment rollout status to be successful. Default: 300
Expand Down Expand Up @@ -1254,6 +1256,7 @@ function get_all_variables() {

# get other variables for cluster agent
get_variable EDGE_CLUSTER_STORAGE_CLASS 'gp2'
get_variable EDGE_CLUSTER_PVC_SIZE "$DEFAULT_PVC_SIZE"
get_variable AGENT_NAMESPACE "$DEFAULT_AGENT_NAMESPACE"
get_variable NAMESPACE_SCOPED 'false'
get_variable USE_EDGE_CLUSTER_REGISTRY 'true'
Expand Down Expand Up @@ -4029,7 +4032,11 @@ function prepare_k8s_pvc_file() {
pvc_mode="ReadWriteMany"
fi

sed -e "s#__AgentNameSpace__#${AGENT_NAMESPACE}#g" -e "s/__StorageClass__/\"${EDGE_CLUSTER_STORAGE_CLASS}\"/g" -e "s#__PVCAccessMode__#${pvc_mode}#g" persistentClaim-template.yml >persistentClaim.yml
if [[ -z $CLUSTER_PVC_SIZE ]]; then
CLUSTER_PVC_SIZE=$DEFAULT_PVC_SIZE
fi

sed -e "s#__AgentNameSpace__#${AGENT_NAMESPACE}#g" -e "s/__StorageClass__/\"${EDGE_CLUSTER_STORAGE_CLASS}\"/g" -e "s#__PVCAccessMode__#${pvc_mode}#g" -e "s#__PVCStorageSize__#${CLUSTER_PVC_SIZE}#g" persistentClaim-template.yml >persistentClaim.yml
chk $? 'creating persistentClaim.yml'

log_debug "prepare_k8s_pvc_file() end"
Expand Down
23 changes: 21 additions & 2 deletions agent-install/k8s/deployment-template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ spec:
- mountPath: /var/horizon
name: agent-pvc-storage
ports:
- containerPort: 8510
- containerPort: 8443
name: ess-secure
securityContext:
runAsUser: 1000
runAsGroup: 1000
Expand All @@ -90,4 +91,22 @@ spec:
# START_CERT_VOL
- name: HZN_MGMT_HUB_CERT_PATH
value: /etc/default/cert/agent-install.crt
# END_CERT_VOL
# END_CERT_VOL
---
apiVersion: v1
kind: Service
metadata:
name: agent-service
namespace: __AgentNameSpace__
labels:
app: agent
openhorizon.org/component: agent
spec:
selector:
app: agent
openhorizon.org/component: agent
ports:
- name: ess-secure-port-name
protocol: TCP
port: 8443
targetPort: 8443
2 changes: 1 addition & 1 deletion agent-install/k8s/persistentClaim-template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ spec:
- __PVCAccessMode__
resources:
requests:
storage: 10Gi
storage: __PVCStorageSize__ # need to be configurable
10 changes: 4 additions & 6 deletions agreementbot/agreementworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1083,12 +1083,10 @@ func (b *BaseAgreementWorker) HandleAgreementReply(cph ConsumerProtocolHandler,

// For the purposes of compatibility, skip this function if the agbot config has not been updated to point to the CSS.
// Only non-pattern based agreements can use MMS object policy.
if agreement.GetDeviceType() == persistence.DEVICE_TYPE_DEVICE {
if b.GetCSSURL() != "" && agreement.Pattern == "" {
AgreementHandleMMSObjectPolicy(b, b.mmsObjMgr, *agreement, workerId, BAWlogstring)
} else if b.GetCSSURL() == "" {
glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("unable to evaluate object placement because there is no CSS URL configured in this agbot")))
}
if b.GetCSSURL() != "" && agreement.Pattern == "" {
AgreementHandleMMSObjectPolicy(b, b.mmsObjMgr, *agreement, workerId, BAWlogstring)
} else if b.GetCSSURL() == "" {
glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("unable to evaluate object placement because there is no CSS URL configured in this agbot")))
}

// Send the reply Ack if it's still valid.
Expand Down
7 changes: 3 additions & 4 deletions anax-in-k8s/script/anax.service
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,17 @@ block() {
editFSSAPIListen() {
echo "Edit FileSyncService.APIListen"
anaxJsonFile='/etc/horizon/anax.json'
echo "Modifying $anaxJsonFile for anax-in-container..."
echo "Modifying $anaxJsonFile for anax-in-k8s..."

anaxJson=$(jq . $anaxJsonFile)
checkrc $? "read anax.json"
cp $anaxJsonFile $anaxJsonFile.orig
checkrc $? "back up anax.json"

hostname=$(cat /etc/hostname)
anaxJson=$(jq ".Edge.FileSyncService.APIListen = \"$hostname\" " <<< $anaxJson)
anaxJson=$(jq ".Edge.FileSyncService.APIListen = \"0.0.0.0\" " <<< $anaxJson)
checkrc $? "change FileSyncService.APIListen"

anaxJson=$(jq ".Edge.FileSyncService.APIProtocol = \"https\" " <<< $anaxJson)
anaxJson=$(jq ".Edge.FileSyncService.APIProtocol = \"secure\" " <<< $anaxJson)
checkrc $? "change FileSyncService.APIProtocol"

echo "$anaxJson" > $anaxJsonFile
Expand Down
2 changes: 1 addition & 1 deletion common/deploymentconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,6 @@ func GetClusterDeploymentMetadata(clusterDeployment interface{}, inspectOperator
}

func GetKubeOperatorNamespace(tar string) (string, error) {
_, namespace, err := kube_operator.ProcessDeployment(tar, nil, map[string]string{}, map[string]string{}, "", 0)
_, namespace, err := kube_operator.ProcessDeployment(tar, nil, map[string]string{}, "", "", map[string]string{}, "", 0)
return namespace, err
}
23 changes: 20 additions & 3 deletions cutil/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/golang/glog"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
Expand All @@ -12,6 +13,8 @@ import (
"os"
)

const AGENT_PVC_NAME = "openhorizon-agent-pvc"

func NewKubeConfig() (*rest.Config, error) {
config, err := rest.InClusterConfig()
if err != nil {
Expand Down Expand Up @@ -93,8 +96,22 @@ func GetClusterNamespace() string {

func IsNamespaceScoped() bool {
isNamespaceScoped := os.Getenv("HZN_NAMESPACE_SCOPED")
if isNamespaceScoped == "true" {
return true
return isNamespaceScoped == "true"
}

// pvc name: openhorizon-agent-pvc
func GetAgentPVCInfo() (string, []v1.PersistentVolumeAccessMode, error) {
client, err := NewKubeClient()
if err != nil {
return "", []v1.PersistentVolumeAccessMode{}, err
}

agentNamespace := GetClusterNamespace()
if agentPVC, err := client.CoreV1().PersistentVolumeClaims(agentNamespace).Get(context.Background(), AGENT_PVC_NAME, metav1.GetOptions{}); err != nil {
return "", []v1.PersistentVolumeAccessMode{}, err
} else {
scName := agentPVC.Spec.StorageClassName
accessMode := agentPVC.Spec.AccessModes
return *scName, accessMode, nil
}
return false
}
32 changes: 23 additions & 9 deletions cutil/cutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,17 +282,31 @@ func SetPlatformEnvvars(envAdds map[string]string, prefix string, agreementId st

// The name of the mounted file containing the FSS API SSL Certificate that the container should use.
envAdds[prefix+"ESS_CERT"] = path.Join(config.HZN_FSS_CERT_MOUNT, config.HZN_FSS_CERT_FILE)

}

// Temporary function to remove ESS env vars for the edge cluster case.
func RemoveESSEnvVars(envAdds map[string]string, prefix string) map[string]string {
delete(envAdds, prefix+"ESS_API_PROTOCOL")
delete(envAdds, prefix+"ESS_API_ADDRESS")
delete(envAdds, prefix+"ESS_API_PORT")
delete(envAdds, prefix+"ESS_AUTH")
delete(envAdds, prefix+"ESS_CERT")
return envAdds
func SetESSEnvVarsForClusterAgent(envAdds map[string]string, prefix string, agreementId string) {
// The address of the file sync service API.
namespace := GetClusterNamespace()
fssAddress := fmt.Sprintf("agent-service.%v.svc.cluster.local", namespace)
envAdds[prefix+"ESS_API_ADDRESS"] = fssAddress

fssPort := envAdds[prefix+"ESS_API_PORT"]
if strings.Contains(fssPort, "\"") {
fssPort = strings.ReplaceAll(fssPort, "\"", "")
}
envAdds[prefix+"ESS_API_PORT"] = fssPort

// The secret name of the FSS credentials that the operator should use.
envAdds[prefix+"ESS_AUTH"] = config.HZN_FSS_AUTH_PATH + "-" + agreementId

// The secret name of FSS API SSL Certificate that the operator should use.
envAdds[prefix+"ESS_CERT"] = config.HZN_FSS_CERT_PATH + "-" + agreementId

// The cluster agent namesace
envAdds[prefix+"AGENT_NAMESPACE"] = namespace

agentStorageClassNAME, _, _ := GetAgentPVCInfo()
envAdds[prefix+"AGENT_STORAGE_CLASS_NAME"] = agentStorageClassNAME
}

// This function is similar to the above, for env vars that are system related. It is only used by workloads.
Expand Down
83 changes: 62 additions & 21 deletions kube_operator/api_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type APIObjectInterface interface {
// Sort a slice of k8s api objects by kind of object
// Returns a map of object type names to api object interfaces types, the namespace to be used for the operator, and an error if one occurs
// Also verifies that all objects are named so they can be found and uninstalled
func sortAPIObjects(allObjects []APIObjects, customResources map[string][]*unstructured.Unstructured, metadata map[string]interface{}, envVarMap map[string]string, secretsMap map[string]string, agreementId string, crInstallTimeout int64) (map[string][]APIObjectInterface, string, error) {
func sortAPIObjects(allObjects []APIObjects, customResources map[string][]*unstructured.Unstructured, metadata map[string]interface{}, envVarMap map[string]string, fssAuthFilePath string, fssCertFilePath string, secretsMap map[string]string, agreementId string, crInstallTimeout int64) (map[string][]APIObjectInterface, string, error) {
namespace := ""

// get the namespace from metadata
Expand Down Expand Up @@ -123,7 +123,7 @@ func sortAPIObjects(allObjects []APIObjects, customResources map[string][]*unstr
return objMap, namespace, fmt.Errorf(kwlog(fmt.Sprintf("Error: multiple namespaces specified in operator: %s and %s", namespace, typedDeployment.ObjectMeta.Namespace)))
}
}
newDeployment := DeploymentAppsV1{DeploymentObject: typedDeployment, EnvVarMap: envVarMap, ServiceSecrets: secretsMap, AgreementId: agreementId}
newDeployment := DeploymentAppsV1{DeploymentObject: typedDeployment, EnvVarMap: envVarMap, FssAuthFilePath: fssAuthFilePath, FssCertFilePath: fssCertFilePath, ServiceSecrets: secretsMap, AgreementId: agreementId}
if newDeployment.Name() != "" {
glog.V(4).Infof(kwlog(fmt.Sprintf("Found kubernetes deployment object %s.", newDeployment.Name())))
objMap[K8S_DEPLOYMENT_TYPE] = append(objMap[K8S_DEPLOYMENT_TYPE], newDeployment)
Expand Down Expand Up @@ -629,6 +629,8 @@ func (sa ServiceAccountCoreV1) Namespace() string {
type DeploymentAppsV1 struct {
DeploymentObject *appsv1.Deployment
EnvVarMap map[string]string
FssAuthFilePath string
FssCertFilePath string
ServiceSecrets map[string]string
AgreementId string
}
Expand All @@ -641,21 +643,50 @@ func (d DeploymentAppsV1) Install(c KubeClient, namespace string) error {
}

// The ESS is not supported in edge cluster services, so for now, remove the ESS env vars.
envAdds := cutil.RemoveESSEnvVars(d.EnvVarMap, config.ENVVAR_PREFIX)
//envAdds := cutil.RemoveESSEnvVars(d.EnvVarMap, config.ENVVAR_PREFIX)
cutil.SetESSEnvVarsForClusterAgent(d.EnvVarMap, config.ENVVAR_PREFIX, d.AgreementId)

// Create the config map.
mapName, err := c.CreateConfigMap(envAdds, d.AgreementId, namespace)
mapName, err := c.CreateConfigMap(d.EnvVarMap, d.AgreementId, namespace)
if err != nil && errors.IsAlreadyExists(err) {
d.Uninstall(c, namespace)
mapName, err = c.CreateConfigMap(envAdds, d.AgreementId, namespace)
c.DeleteConfigMap(d.AgreementId, namespace)
mapName, err = c.CreateConfigMap(d.EnvVarMap, d.AgreementId, namespace)
}
if err != nil {
return err
}

// create k8s secrets object from ess auth file. d.FssAuthFilePath == "" if kubeworker is updating service vault secret
if d.FssAuthFilePath != "" {
essAuthSecretName, err := c.CreateESSAuthSecrets(d.FssAuthFilePath, d.AgreementId, namespace)
if err != nil && errors.IsAlreadyExists(err) {
c.DeleteESSAuthSecrets(d.AgreementId, namespace)
essAuthSecretName, _ = c.CreateESSAuthSecrets(d.FssAuthFilePath, d.AgreementId, namespace)
}
glog.V(3).Infof(kwlog(fmt.Sprintf("ess auth secret %v is created under namespace: %v", essAuthSecretName, namespace)))
}

if d.FssCertFilePath != "" {
essCertSecretName, err := c.CreateESSCertSecrets(d.FssCertFilePath, d.AgreementId, namespace)
if err != nil && errors.IsAlreadyExists(err) {
c.DeleteESSCertSecrets(d.AgreementId, namespace)
essCertSecretName, _ = c.CreateESSCertSecrets(d.FssCertFilePath, d.AgreementId, namespace)
}
glog.V(3).Infof(kwlog(fmt.Sprintf("ess cert secret %v is created under namespace: %v", essCertSecretName, namespace)))
}

// create MMS pvc
pvcName, err := c.CreateMMSPVC(d.EnvVarMap, d.AgreementId, namespace)
if err != nil && errors.IsAlreadyExists(err) {
c.DeleteMMSPVC(d.AgreementId, namespace)
pvcName, _ = c.CreateMMSPVC(d.EnvVarMap, d.AgreementId, namespace)
}
glog.V(3).Infof(kwlog(fmt.Sprintf("MMS pvc %v is created under namespace: %v", pvcName, namespace)))

// Let the operator know about the config map
dWithEnv := addConfigMapVarToDeploymentObject(*d.DeploymentObject, mapName)

// handle service vault secrets
if len(d.ServiceSecrets) > 0 {
glog.V(3).Infof(kwlog(fmt.Sprintf("creating k8s secrets for service secret %v", d.ServiceSecrets)))

Expand All @@ -667,7 +698,7 @@ func (d DeploymentAppsV1) Install(c KubeClient, namespace string) error {

secretsName, err := c.CreateK8SSecrets(decodedSecrets, d.AgreementId, namespace)
if err != nil && errors.IsAlreadyExists(err) {
d.Uninstall(c, namespace)
c.DeleteK8SSecrets(d.AgreementId, namespace)
secretsName, err = c.CreateK8SSecrets(d.ServiceSecrets, d.AgreementId, namespace)
}
if err != nil {
Expand All @@ -680,7 +711,7 @@ func (d DeploymentAppsV1) Install(c KubeClient, namespace string) error {
_, err = c.Client.AppsV1().Deployments(namespace).Create(context.Background(), &dWithEnv, metav1.CreateOptions{})
if err != nil && errors.IsAlreadyExists(err) {
d.Uninstall(c, namespace)
mapName, err = c.CreateConfigMap(envAdds, d.AgreementId, namespace)
_, _ = c.CreateConfigMap(d.EnvVarMap, d.AgreementId, namespace)
_, err = c.Client.AppsV1().Deployments(namespace).Create(context.Background(), &dWithEnv, metav1.CreateOptions{})
}
if err != nil {
Expand Down Expand Up @@ -731,21 +762,31 @@ func (d DeploymentAppsV1) Uninstall(c KubeClient, namespace string) {
glog.Errorf(kwlog(fmt.Sprintf("unable to delete deployment %s. Error: %v", d.DeploymentObject.ObjectMeta.Name, err)))
}

configMapName := fmt.Sprintf("%s-%s", HZN_ENV_VARS, d.AgreementId)
glog.V(3).Infof(kwlog(fmt.Sprintf("deleting config map %v in namespace %v", configMapName, namespace)))
// Delete the agreement config map
err = c.Client.CoreV1().ConfigMaps(namespace).Delete(context.Background(), configMapName, metav1.DeleteOptions{})
if err != nil {
glog.Errorf(kwlog(fmt.Sprintf("unable to delete config map %s. Error: %v", configMapName, err)))
glog.V(3).Infof(kwlog(fmt.Sprintf("deleting config map for agreement %v in namespace %v", d.AgreementId, namespace)))
if err = c.DeleteConfigMap(d.AgreementId, namespace); err != nil {
glog.Errorf(kwlog(err.Error()))
}

// delete the secrets contains agreement service vault secrets
secretsName := fmt.Sprintf("%s-%s", HZN_SERVICE_SECRETS, d.AgreementId)
glog.V(3).Infof(kwlog(fmt.Sprintf("deleting secrets %v in namespace %v", secretsName, namespace)))
err = c.Client.CoreV1().Secrets(namespace).Delete(context.Background(), secretsName, metav1.DeleteOptions{})
if err != nil {
glog.Errorf(kwlog(fmt.Sprintf("unable to delete secrets %s in namespace %v. Error: %v", secretsName, namespace, err)))
glog.V(3).Infof(kwlog(fmt.Sprintf("deleting ess auth secret for agreement %v in namespace %v", d.AgreementId, namespace)))
if err = c.DeleteESSAuthSecrets(d.AgreementId, namespace); err != nil {
glog.Errorf(kwlog(err.Error()))
}

glog.V(3).Infof(kwlog(fmt.Sprintf("deleting ess cert secret for agreement %v in namespace %v", d.AgreementId, namespace)))
if err = c.DeleteESSCertSecrets(d.AgreementId, namespace); err != nil {
glog.Errorf(kwlog(err.Error()))
}

glog.V(3).Infof(kwlog(fmt.Sprintf("deleting secrets for agreement %v in namespace %v", d.AgreementId, namespace)))
if err = c.DeleteK8SSecrets(d.AgreementId, namespace); err != nil {
glog.Errorf(kwlog(err.Error()))
}

glog.V(3).Infof(kwlog(fmt.Sprintf("deleting mms pvc for agreement %v in namespace %v", d.AgreementId, namespace)))
if err = c.DeleteMMSPVC(d.AgreementId, namespace); err != nil {
glog.Errorf(kwlog(err.Error()))
}

}

// Status will be the status of the operator pod
Expand All @@ -756,7 +797,7 @@ func (d DeploymentAppsV1) Status(c KubeClient, namespace string) (interface{}, e
return nil, err
} else if podList == nil || len(podList.Items) == 0 {
labelSelector := metav1.LabelSelector{MatchLabels: d.DeploymentObject.Spec.Selector.MatchLabels}
podList, err = c.Client.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{LabelSelector: labels.Set(labelSelector.MatchLabels).String()})
podList, _ = c.Client.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{LabelSelector: labels.Set(labelSelector.MatchLabels).String()})
}
return podList, nil
}
Expand Down
Loading

0 comments on commit 6c81194

Please sign in to comment.