Skip to content

Commit

Permalink
Merge branch 'master' into avoid_worker_polling
Browse files Browse the repository at this point in the history
  • Loading branch information
jlearman authored Jul 24, 2024
2 parents d6fedda + b10d2cd commit 7757467
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 36 deletions.
16 changes: 15 additions & 1 deletion agent-install/agent-install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -1256,6 +1256,8 @@ function get_all_variables() {

# get other variables for cluster agent
get_variable EDGE_CLUSTER_STORAGE_CLASS 'gp2'
check_cluster_storage_class "$EDGE_CLUSTER_STORAGE_CLASS"

get_variable EDGE_CLUSTER_PVC_SIZE "$DEFAULT_PVC_SIZE"
get_variable AGENT_NAMESPACE "$DEFAULT_AGENT_NAMESPACE"
get_variable NAMESPACE_SCOPED 'false'
Expand Down Expand Up @@ -1562,7 +1564,7 @@ function get_kubernetes_version() {
major_version=$($KUBECTL version -o json | jq '.serverVersion.major' | sed s/\"//g)
minor_version=$($KUBECTL version -o json | jq '.serverVersion.minor' | sed s/\"//g)
if [ "${minor_version:0-1}" == "+" ]; then
minor_version=${minor_version::-1}
minor_version=${minor_version::$((${#minor_version} - 1))}
fi

full_version="$major_version.$minor_version"
Expand Down Expand Up @@ -3307,6 +3309,18 @@ function get_cluster_image_arch() {
echo $image_arch
}

# check if the storage class exists in the edge cluster
function check_cluster_storage_class() {
log_debug "check_cluster_storage_class() begin"
local storage_class=$1
if $KUBECTL get storageclass ${storage_class} >/dev/null 2>&1; then
log_verbose "storage class $storage_class exists in the edge cluster"
else
log_fatal 2 "storage class $storage_class does not exist in the edge cluster"
fi
log_debug "check_cluster_storage_class() end"
}

# checks if OS/distribution/codename/arch is supported
function check_support() {
log_debug "check_support() begin"
Expand Down
16 changes: 16 additions & 0 deletions agreement/agreement.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ const (
EL_AG_TERM_UNABLE_SYNC_AGS = "anax terminating, unable to complete agreement sync up. %v"
)

// name for the subworker
const NODE_POLICY_WATCHER = "NodePolicyWatcher"

// This is does nothing useful at run time.
// This code is only used at compile time to make the eventlog messages get into the catalog so that
// they can be translated.
Expand Down Expand Up @@ -310,6 +313,9 @@ func (w *AgreementWorker) Initialize() bool {
}
}

// this subworker will catch if the node policy built-in properties have changed without the agent restarting
w.DispatchSubworker(NODE_POLICY_WATCHER, w.reconcileNodePolicy, 60, false)

glog.Info(logString(fmt.Sprintf("waiting for commands.")))

return true
Expand Down Expand Up @@ -341,6 +347,16 @@ func (w *AgreementWorker) syncNode() error {
return nil
}

func (w *AgreementWorker) reconcileNodePolicy() int {
if w.hznOffline {
return 60
}

w.checkNodePolicyChanges()

return 60
}

// Enter the command processing loop. Initialization is complete so wait for commands to
// perform. Commands are created as the result of events that are triggered elsewhere
// in the system. This function returns ture if the command was handled, false if not.
Expand Down
10 changes: 6 additions & 4 deletions api/api_eventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,16 @@ func (a *API) eventlog(w http.ResponseWriter, r *http.Request) {

glog.V(5).Infof(apiLogString(fmt.Sprintf("Handling %v on resource %v with selection %v. Language: %v", r.Method, resource, r.Form, lan)))

if err := DeleteEventLogs(a.db, prune, r.Form, msgPrinter); err != nil {
if count, err := DeleteEventLogs(a.db, prune, r.Form, msgPrinter); err != nil {
errorHandler(NewSystemError(msgPrinter.Sprintf("Error deleting %v, error %v", resource, err)))
} else {
} else if count > 0 {
if prune {
writeResponse(w, "Successfully pruned event logs.", http.StatusOK)
writeResponse(w, fmt.Sprintf("%v", count), http.StatusOK)
} else {
writeResponse(w, "Successfully deleted event logs.", http.StatusOK)
writeResponse(w, fmt.Sprintf("%v", count), http.StatusOK)
}
} else {
writeResponse(w, fmt.Sprintf("No matching event log entries found."), http.StatusNoContent)
}
case "OPTIONS":
w.Header().Set("Allow", "GET, OPTIONS")
Expand Down
10 changes: 5 additions & 5 deletions api/path_eventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ func FindEventLogsForOutput(db *bolt.DB, all_logs bool, selections map[string][]
}

// This API deletes the selected event logs saved on the db.
func DeleteEventLogs(db *bolt.DB, prune bool, selections map[string][]string, msgPrinter *message.Printer) error {
func DeleteEventLogs(db *bolt.DB, prune bool, selections map[string][]string, msgPrinter *message.Printer) (int, error) {
s := map[string][]persistence.Selector{}
if prune {
lastUnreg, err := persistence.GetLastUnregistrationTime(db)
if err != nil {
return fmt.Errorf("Failed to get the last unregistration time stamp from db. %v", err)
return 0, fmt.Errorf("Failed to get the last unregistration time stamp from db. %v", err)
}

s["timestamp"] = []persistence.Selector{persistence.Selector{Op: "<", MatchValue: lastUnreg}}
Expand All @@ -51,14 +51,14 @@ func DeleteEventLogs(db *bolt.DB, prune bool, selections map[string][]string, ms
var err error
s, err = persistence.ConvertToSelectors(selections)
if err != nil {
return fmt.Errorf(msgPrinter.Sprintf("Error converting the selections into Selectors: %v", err))
return 0, fmt.Errorf(msgPrinter.Sprintf("Error converting the selections into Selectors: %v", err))
} else {
glog.V(5).Infof(apiLogString(fmt.Sprintf("Converted selections into a map of persistence.Selector arrays: %v.", s)))
}
}

err := eventlog.DeleteEventLogs(db, s, msgPrinter)
return err
count, err := eventlog.DeleteEventLogs(db, s, msgPrinter)
return count, err
}

func FindSurfaceLogsForOutput(db *bolt.DB, msgPrinter *message.Printer) ([]persistence.SurfaceError, error) {
Expand Down
2 changes: 1 addition & 1 deletion api/path_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func LogDeviceEvent(db *bolt.DB, severity string, message *persistence.MessageMe
if d.Org != nil {
org = *d.Org
}
if d.Pattern != nil {
if d.Pattern != nil && *d.Pattern != "" {
pattern = fmt.Sprintf("%v/%v", org, *d.Pattern)
}
if d.Config != nil {
Expand Down
40 changes: 28 additions & 12 deletions cli/cliutils/cliutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1098,20 +1098,29 @@ func GetIcpCertPath() string {

// TrustIcpCert adds the icp cert file to be trusted in calls made by the given http client
func TrustIcpCert(httpClient *http.Client) error {
icpCertPath := GetIcpCertPath()
if icpCertPath != "" {
icpCert, err := ioutil.ReadFile(icpCertPath)
if err != nil {
return fmt.Errorf(i18n.GetMessagePrinter().Sprintf("Encountered error reading ICP cert file %v: %v", icpCertPath, err))
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(icpCert)
icpCertPath := GetIcpCertPath()

transport := httpClient.Transport.(*http.Transport)
transport.TLSClientConfig.RootCAs = caCertPool
var caCertPool *x509.CertPool
var err error

}
return nil
// Trust the system certs like the anax agent code can
caCertPool, err = x509.SystemCertPool()
if err != nil {
// Decided not to fail and return here but just create a new pool
caCertPool = x509.NewCertPool()
}

if icpCertPath != "" {
icpCert, err := ioutil.ReadFile(icpCertPath)
if err != nil {
return fmt.Errorf(i18n.GetMessagePrinter().Sprintf("Encountered error reading ICP cert file %v: %v", icpCertPath, err))
}
caCertPool.AppendCertsFromPEM(icpCert)
}

transport := httpClient.Transport.(*http.Transport)
transport.TLSClientConfig.RootCAs = caCertPool
return nil
}

// Get exchange url from /etc/default/horizon file. if not set, check /etc/horizon/anax.json file
Expand Down Expand Up @@ -2293,3 +2302,10 @@ func ValidateOrg(org string) bool {
}
return invalidCheck
}

// remove leading and trailing quotation marks if present
func RemoveQuotes(s string) string {
s = strings.TrimPrefix(s, "\"")
s = strings.TrimSuffix(s, "\"")
return s
}
17 changes: 11 additions & 6 deletions cli/eventlog/eventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/open-horizon/anax/cli/cliutils"
"github.com/open-horizon/anax/i18n"
"github.com/open-horizon/anax/persistence"
"net/http"
"regexp"
"strings"
"time"
Expand Down Expand Up @@ -80,12 +81,12 @@ func Delete(selections []string, force bool) {
cliutils.ConfirmRemove(i18n.GetMessagePrinter().Sprintf("Are you sure you want to remove all event logs?"))
}

cliutils.HorizonDelete(url_s, []int{200, 204}, []int{}, false)
retCode, count := cliutils.HorizonDelete(url_s, []int{204}, []int{200}, false)

if len(selections) > 0 {
fmt.Println(i18n.GetMessagePrinter().Sprintf("Successfully deleted the selected eventlogs."))
if retCode == http.StatusOK {
fmt.Println(i18n.GetMessagePrinter().Sprintf("Successfully deleted %v matching event log entries.", cliutils.RemoveQuotes(fmt.Sprintf("%v",count))))
} else {
fmt.Println(i18n.GetMessagePrinter().Sprintf("Successfully deleted the eventlogs."))
fmt.Println(i18n.GetMessagePrinter().Sprintf("No event log entries matching the given selectors were found."))
}
}

Expand All @@ -96,9 +97,13 @@ func Prune(force bool) {
cliutils.ConfirmRemove(i18n.GetMessagePrinter().Sprintf("Are you sure you want to remove all event logs from previous registrations?"))
}

cliutils.HorizonDelete(url, []int{200, 204}, []int{}, false)
retCode, count := cliutils.HorizonDelete(url, []int{204}, []int{200}, false)

fmt.Println(i18n.GetMessagePrinter().Sprintf("Successfully pruned the eventlogs."))
if retCode == http.StatusOK {
fmt.Println(i18n.GetMessagePrinter().Sprintf("Successfully pruned %v matching event log entries.", cliutils.RemoveQuotes(fmt.Sprintf("%v",count))))
} else {
fmt.Println(i18n.GetMessagePrinter().Sprintf("No event log entries from previous registrations were found."))
}
}

func List(all bool, detail bool, selections []string, tailing bool) {
Expand Down
6 changes: 3 additions & 3 deletions cli/hzn.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,9 @@ Environment Variables:
listTail := eventlogListCmd.Flag("tail", msgPrinter.Sprintf("Continuously polls the event log to display the most recent records, similar to tail -F behavior.")).Short('f').Bool()
listAllEventlogs := eventlogListCmd.Flag("all", msgPrinter.Sprintf("List all the event logs including the previous registrations.")).Short('a').Bool()
listDetailedEventlogs := eventlogListCmd.Flag("long", msgPrinter.Sprintf("List event logs with details.")).Short('l').Bool()
listSelectedEventlogs := eventlogListCmd.Flag("select", msgPrinter.Sprintf("Selection string. This flag can be repeated which means 'AND'. Each flag should be in the format of attribute=value, attribute~value, \"attribute>value\" or \"attribute<value\", where '~' means contains. The common attribute names are timestamp, severity, message, event_code, source_type, agreement_id, service_url etc. Use the '-l' flag to see all the attribute names.")).Short('s').Strings()
eventlogDeleteCmd := eventlogCmd.Command("delete | del", msgPrinter.Sprintf("Delete the all event logs or those matching the provided selectors.")).Alias("del").Alias("delete")
deleteSelectedEventlogs := eventlogDeleteCmd.Flag("select", msgPrinter.Sprintf("Selection string. This flag can be repeated which means 'AND'. Each flag should be in the format of attribute=value, attribute~value, \"attribute>value\" or\"attribute<value\", where '~' means contains. The common attribute names are timestamp, severity, message, event_code, source_type, agreement_id, service_url etc. Use the '-l' flag to see all the attribute names.")).Short('s').Strings()
listSelectedEventlogs := eventlogListCmd.Flag("select", msgPrinter.Sprintf("Selection string. This flag can be repeated which means 'AND'. Each flag should be in the format of attribute=value, attribute~value, \"attribute>value\" or \"attribute<value\", where '~' means contains. The common attribute names are timestamp, time_since (unit is hours), severity, message, event_code, source_type, agreement_id, service_url etc. Use the '-l' flag to see all the attribute names.")).Short('s').Strings()
eventlogDeleteCmd := eventlogCmd.Command("delete | del", msgPrinter.Sprintf("Delete all the event logs or those matching the provided selectors.")).Alias("del").Alias("delete")
deleteSelectedEventlogs := eventlogDeleteCmd.Flag("select", msgPrinter.Sprintf("Selection string. This flag can be repeated which means 'AND'. Each flag should be in the format of attribute=value, attribute~value, \"attribute>value\" or\"attribute<value\", where '~' means contains. The common attribute names are timestamp, time_since (unit is hours), severity, message, event_code, source_type, agreement_id, service_url etc. Use the '-l' flag to see all the attribute names.")).Short('s').Strings()
deleteEventLogsForce := eventlogDeleteCmd.Flag("force", msgPrinter.Sprintf("Skip the 'are you sure?' prompt.")).Short('f').Bool()
eventlogPruneCmd := eventlogCmd.Command("prune | pr", msgPrinter.Sprintf("Delete the all event logs from previous registrations.")).Alias("pr").Alias("prune")
pruneEventLogsForce := eventlogPruneCmd.Flag("force", msgPrinter.Sprintf("Skip the 'are you sure?' prompt.")).Short('f').Bool()
Expand Down
2 changes: 1 addition & 1 deletion eventlog/eventlog_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func GetEventLogs(db *bolt.DB, all_logs bool, selectors map[string][]persistence
return persistence.FindEventLogsWithSelectors(db, all_logs, selectors, msgPrinter)
}

func DeleteEventLogs(db *bolt.DB, selectors map[string][]persistence.Selector, msgPrinter *message.Printer) error {
func DeleteEventLogs(db *bolt.DB, selectors map[string][]persistence.Selector, msgPrinter *message.Printer) (int, error) {
return persistence.DeleteEventLogsWithSelectors(db, selectors, msgPrinter)
}

Expand Down
14 changes: 11 additions & 3 deletions persistence/eventlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const EVENT_LOGS = "event_logs"
// table stores the timestamp of last unregistration
const LAST_UNREG = "last_unreg"

const BASE_SELECTORS = "source_type,severity,message,event_code,record_id,timestamp" // only support these 2 for now
const BASE_SELECTORS = "source_type,severity,message,event_code,record_id,timestamp,time_since"

// Each event source implements this interface.
type EventSourceInterface interface {
Expand Down Expand Up @@ -65,6 +65,8 @@ func (w EventLogBase) Matches(selectors map[string][]Selector) bool {
attr = w.Id
case "timestamp":
attr = w.Timestamp
case "time_since":
attr = (uint64(time.Now().Unix()) - w.Timestamp)/3600
default:
return false // not tolerate wrong attribute name in the selector
}
Expand Down Expand Up @@ -410,15 +412,18 @@ func FindEventLogs(db *bolt.DB, filters []EventLogFilter) ([]EventLog, error) {
}

// delete event logs from the db that match the given selectors
func DeleteEventLogsWithSelectors(db *bolt.DB, selectors map[string][]Selector, msgPrinter *message.Printer) error {
// returns the number of logs deleted
func DeleteEventLogsWithSelectors(db *bolt.DB, selectors map[string][]Selector, msgPrinter *message.Printer) (int, error) {
// separate base selectors from the source selectors
base_selectors, source_selectors := GroupSelectors(selectors)

if msgPrinter == nil {
msgPrinter = i18n.GetMessagePrinter()
}

return db.Update(func(tx *bolt.Tx) error {
count := 0

dbErr := db.Update(func(tx *bolt.Tx) error {
if b := tx.Bucket([]byte(EVENT_LOGS)); b != nil {
b.ForEach(func(k, v []byte) error {
var el EventLogRaw
Expand All @@ -431,6 +436,7 @@ func DeleteEventLogsWithSelectors(db *bolt.DB, selectors map[string][]Selector,
glog.Errorf("Unable to convert event source: %v. Error: %v", el.Source, err)
} else if (*esrc).Matches(source_selectors) {
b.Delete(k)
count ++
}
}
}
Expand All @@ -439,6 +445,8 @@ func DeleteEventLogsWithSelectors(db *bolt.DB, selectors map[string][]Selector,
}
return nil
})

return count, dbErr
}

// find event logs from the db for the given given selectors.
Expand Down

0 comments on commit 7757467

Please sign in to comment.