Skip to content

Commit

Permalink
Merge pull request #33 from flant/feat_implicit_enabled_key
Browse files Browse the repository at this point in the history
Explicit enabled key for modules
  • Loading branch information
diafour authored Aug 13, 2019
2 parents e70c0a5 + 3c93346 commit 0d01582
Show file tree
Hide file tree
Showing 22 changed files with 1,145 additions and 703 deletions.
69 changes: 13 additions & 56 deletions pkg/addon-operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ var (

// ManagersEventsHandlerStopCh is the channel object for stopping infinite loop of the ManagersEventsHandler.
ManagersEventsHandlerStopCh chan struct{}

// HelmClient is the object for Helm client.
HelmClient helm.HelmClient
)

const DefaultModulesDir = "modules"
Expand Down Expand Up @@ -123,7 +120,7 @@ func Init() error {
// Initializing helm. Installing Tiller, if it is missing.
tillerNamespace := kube.AddonOperatorNamespace
rlog.Infof("INIT: Namespace for Tiller: %s", tillerNamespace)
HelmClient, err = helm.Init(tillerNamespace)
err = helm.Init(tillerNamespace)
if err != nil {
rlog.Errorf("INIT: cannot initialize Tiller in ns/%s: %s", tillerNamespace, err)
return err
Expand All @@ -146,8 +143,7 @@ func Init() error {
kube_config_manager.ValuesChecksumsAnnotation = ValuesChecksumsAnnotation
module_manager.Init()
ModuleManager = module_manager.NewMainModuleManager().
WithDirectories(ModulesDir, GlobalHooksDir, TempDir).
WithHelmClient(HelmClient)
WithDirectories(ModulesDir, GlobalHooksDir, TempDir)
err = ModuleManager.Init()
if err != nil {
rlog.Errorf("INIT: Cannot initialize module manager: %s", err)
Expand Down Expand Up @@ -232,61 +228,23 @@ func ManagersEventsHandler() {
// Some modules have changed.
case module_manager.ModulesChanged:
for _, moduleChange := range moduleEvent.ModulesChanges {
switch moduleChange.ChangeType {
case module_manager.Enabled:
// TODO этого события по сути нет. Нужно реализовать для вызова onStartup!
rlog.Infof("EVENT ModulesChanged, type=Enabled")
newTask := task.NewTask(task.ModuleRun, moduleChange.Name).
WithOnStartupHooks(true)
TasksQueue.Add(newTask)
rlog.Infof("QUEUE add ModuleRun %s", newTask.Name)

err := KubeEventsHooks.EnableModuleHooks(moduleChange.Name, ModuleManager, KubeEventsManager)
if err != nil {
rlog.Errorf("MAIN_LOOP module '%s' enabled: cannot enable hooks: %s", moduleChange.Name, err)
}

case module_manager.Changed:
rlog.Infof("EVENT ModulesChanged, type=Changed")
newTask := task.NewTask(task.ModuleRun, moduleChange.Name)
TasksQueue.Add(newTask)
rlog.Infof("QUEUE add ModuleRun %s", newTask.Name)

case module_manager.Disabled:
rlog.Infof("EVENT ModulesChanged, type=Disabled")
newTask := task.NewTask(task.ModuleDelete, moduleChange.Name)
TasksQueue.Add(newTask)
rlog.Infof("QUEUE add ModuleDelete %s", newTask.Name)

err := KubeEventsHooks.DisableModuleHooks(moduleChange.Name, ModuleManager, KubeEventsManager)
if err != nil {
rlog.Errorf("MAIN_LOOP module '%s' disabled: cannot disable hooks: %s", moduleChange.Name, err)
}

case module_manager.Purged:
rlog.Infof("EVENT ModulesChanged, type=Purged")
newTask := task.NewTask(task.ModulePurge, moduleChange.Name)
TasksQueue.Add(newTask)
rlog.Infof("QUEUE add ModulePurge %s", newTask.Name)

err := KubeEventsHooks.DisableModuleHooks(moduleChange.Name, ModuleManager, KubeEventsManager)
if err != nil {
rlog.Errorf("MAIN_LOOP module '%s' purged: cannot disable hooks: %s", moduleChange.Name, err)
}
}
rlog.Infof("EVENT ModulesChanged, type=Changed")
newTask := task.NewTask(task.ModuleRun, moduleChange.Name)
TasksQueue.Add(newTask)
rlog.Infof("QUEUE add ModuleRun %s", newTask.Name)
}
// As module list may have changed, hook schedule index must be re-created.
ScheduledHooks = UpdateScheduleHooks(ScheduledHooks)
// As global values may have changed, all modules must be restarted.
case module_manager.GlobalChanged:
// Global values are changed, all modules must be restarted.
rlog.Infof("EVENT GlobalChanged")
TasksQueue.ChangesDisable()
CreateReloadAllTasks(false)
TasksQueue.ChangesEnable(true)
// Re-creating schedule hook index
ScheduledHooks = UpdateScheduleHooks(ScheduledHooks)
case module_manager.AmbigousState:
rlog.Infof("EVENT AmbigousState")
rlog.Infof("EVENT AmbiguousState")
TasksQueue.ChangesDisable()
// It is the error in the module manager. The task must be added to
// the beginning of the queue so the module manager can restore its
Expand Down Expand Up @@ -401,13 +359,15 @@ func runDiscoverModulesState(t task.Task) error {
ScheduledHooks = UpdateScheduleHooks(nil)

// Enable kube events hooks for newly enabled modules
// FIXME convert to a task that run after AfterHelm if there is a flag in binding config to start informers after CRD installation.
for _, moduleName := range modulesState.EnabledModules {
err = KubeEventsHooks.EnableModuleHooks(moduleName, ModuleManager, KubeEventsManager)
if err != nil {
return err
}
}

// TODO is queue should be cleaned from hook run tasks of deleted module?
// Disable kube events hooks for newly disabled modules
for _, moduleName := range modulesState.ModulesToDisable {
err = KubeEventsHooks.DisableModuleHooks(moduleName, ModuleManager, KubeEventsManager)
Expand All @@ -422,9 +382,7 @@ func runDiscoverModulesState(t task.Task) error {
// TasksRunner handle tasks in queue.
//
// Task handler may delay task processing by pushing delay to the queue.
// TODO пока только один обработчик, всё ок. Но лучше, чтобы очередь позволяла удалять только то, чему ранее был сделан peek.
// Т.е. кто взял в обработку задание, тот его и удалил из очереди. Сейчас Peek-нуть может одна го-рутина, другая добавит,
// первая Pop-нет задание — новое задание пропало, второй раз будет обработано одно и тоже.
// FIXME: For now, only one TaskRunner for a TasksQueue. There should be a lock between Peek and Pop to prevent Poping tasks from other TaskRunner
func TasksRunner() {
for {
if TasksQueue.IsEmpty() {
Expand Down Expand Up @@ -457,7 +415,7 @@ func TasksRunner() {
if err != nil {
MetricsStorage.SendCounterMetric(PrefixMetric("module_run_errors"), 1.0, map[string]string{"module": t.GetName()})
t.IncrementFailureCount()
rlog.Errorf("TASK_RUN %s '%s' failed. Will retry after delay. Failed count is %d. Error: %s", t.GetType(), t.GetName(), t.GetFailureCount(), err)
rlog.Errorf("TASK_RUN ModuleRun '%s' failed. Will retry after delay. Failed count is %d. Error: %s", t.GetName(), t.GetFailureCount(), err)
TasksQueue.Push(task.NewTaskDelay(FailedModuleDelay))
rlog.Infof("QUEUE push FailedModuleDelay")
} else {
Expand Down Expand Up @@ -518,14 +476,13 @@ func TasksRunner() {
case task.ModulePurge:
rlog.Infof("TASK_RUN ModulePurge %s", t.GetName())
// Module for purge is unknown so log deletion error is enough.
err := HelmClient.DeleteRelease(t.GetName())
err := helm.Client.DeleteRelease(t.GetName())
if err != nil {
rlog.Errorf("TASK_RUN %s Helm delete '%s' failed. Error: %s", t.GetType(), t.GetName(), err)
}
TasksQueue.Pop()
case task.ModuleManagerRetry:
rlog.Infof("TASK_RUN ModuleManagerRetry")
// TODO метрику нужно отсылать из module_manager. Cделать metric_storage глобальным!
MetricsStorage.SendCounterMetric(PrefixMetric("modules_discover_errors"), 1.0, map[string]string{})
ModuleManager.Retry()
TasksQueue.Pop()
Expand Down
35 changes: 18 additions & 17 deletions pkg/addon-operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (m *ModuleManagerMock) DiscoverModulesState() (*module_manager.ModulesState
func (m *ModuleManagerMock) GetGlobalHook(name string) (*module_manager.GlobalHook, error) {
if _, has_hook := scheduledHooks[name]; has_hook {
return &module_manager.GlobalHook{
Hook: &module_manager.Hook{
CommonHook: &module_manager.CommonHook{
Name: name,
Path: "/addon-operator/hooks/global_1",
Bindings: []module_manager.BindingType{module_manager.Schedule},
Expand All @@ -142,7 +142,7 @@ func (m *ModuleManagerMock) GetGlobalHook(name string) (*module_manager.GlobalHo
} else {
// Global hook run task handler requires Path field
return &module_manager.GlobalHook{
Hook: &module_manager.Hook{
CommonHook: &module_manager.CommonHook{
Name: name,
Path: "/addon-operator/hooks/global_hook_1_1",
Bindings: []module_manager.BindingType{module_manager.BeforeAll},
Expand All @@ -158,7 +158,7 @@ func (m *ModuleManagerMock) GetGlobalHook(name string) (*module_manager.GlobalHo
func (m *ModuleManagerMock) GetModuleHook(name string) (*module_manager.ModuleHook, error) {
if _, has_hook := scheduledHooks[name]; has_hook {
return &module_manager.ModuleHook{
Hook: &module_manager.Hook{
CommonHook: &module_manager.CommonHook{
Name: name,
Path: "/addon-operator/modules/000_test_modu",
Bindings: []module_manager.BindingType{module_manager.Schedule},
Expand Down Expand Up @@ -256,11 +256,6 @@ func (m *ModuleManagerMock) WithDirectories(modulesDir string, globalHooksDir st
return m
}

func (m *ModuleManagerMock) WithHelmClient(helm helm.HelmClient) module_manager.ModuleManager {
fmt.Println("WithHelm")
return m
}

func (m *ModuleManagerMock) WithKubeConfigManager(kubeConfigManager kube_config_manager.KubeConfigManager) module_manager.ModuleManager {
fmt.Println("WithKubeConfigManager")
return m
Expand Down Expand Up @@ -350,14 +345,15 @@ func TestMain_ModulesEventsHandler(t *testing.T) {
KubeEventsManager = &KubeEventsManagerMock{}
KubeEventsHooks = &KubeEventsHooksControllerMock{}

assert.Equal(t, 0, 0)
fmt.Println("Create queue")
// Fill a queue
TasksQueue = task.NewTasksQueue()
// watcher for more verbosity of CreateStartupTasks and
TasksQueue.AddWatcher(&QueueDumperTest{})
TasksQueue.ChangesEnable(true)

assert.Equal(t, 0, TasksQueue.Length())

go func(ch chan module_manager.Event) {
ch <- module_manager.Event{
Type: module_manager.ModulesChanged,
Expand All @@ -368,20 +364,25 @@ func TestMain_ModulesEventsHandler(t *testing.T) {
},
{
Name: "test_module_2",
ChangeType: module_manager.Disabled,
ChangeType: module_manager.Changed,
},
},
}
ch <- module_manager.Event{
Type: module_manager.ModulesChanged,
ModulesChanges: []module_manager.ModuleChange{
{
Name: "test_module_purged",
ChangeType: module_manager.Purged,
Name: "test_module_2",
ChangeType: module_manager.Changed,
},
},
}
ch <- module_manager.Event{
Type: module_manager.ModulesChanged,
ModulesChanges: []module_manager.ModuleChange{
{
Name: "test_module_enabled",
ChangeType: module_manager.Enabled,
Name: "test_module_1",
ChangeType: module_manager.Changed,
},
},
}
Expand Down Expand Up @@ -426,7 +427,7 @@ func TestMain_Run_With_InfiniteModuleError(t *testing.T) {

// Сделать моки для всего, что нужно для запуска Run

HelmClient = MockHelmClient{
helm.Client = MockHelmClient{
DeleteReleaseErrorsCount: 0,
}

Expand Down Expand Up @@ -486,7 +487,7 @@ func TestMain_Run_With_RecoverableErrors(t *testing.T) {

// Сделать моки для всего, что нужно для запуска Run

HelmClient = MockHelmClient{
helm.Client = MockHelmClient{
DeleteReleaseErrorsCount: 3,
}

Expand Down Expand Up @@ -564,7 +565,7 @@ func TestMain_ScheduledTasks(t *testing.T) {

runOrder = []int{}

HelmClient = MockHelmClient{
helm.Client = MockHelmClient{
DeleteReleaseErrorsCount: 3,
}

Expand Down
18 changes: 11 additions & 7 deletions pkg/helm/helm.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,30 +36,34 @@ type HelmClient interface {
IsReleaseExists(releaseName string) (bool, error)
}

var Client HelmClient

type CliHelm struct {
tillerNamespace string
}

// Init starts Tiller installation.
func Init(tillerNamespace string) (HelmClient, error) {
func Init(tillerNamespace string) error {
rlog.Info("Helm: run helm init")

helm := &CliHelm{tillerNamespace: tillerNamespace}
cliHelm := &CliHelm{tillerNamespace: tillerNamespace}

err := helm.InitTiller()
err := cliHelm.InitTiller()
if err != nil {
return nil, err
return err
}

stdout, stderr, err := helm.Cmd("version")
stdout, stderr, err := cliHelm.Cmd("version")
if err != nil {
return nil, fmt.Errorf("unable to get helm version: %v\n%v %v", err, stdout, stderr)
return fmt.Errorf("unable to get helm version: %v\n%v %v", err, stdout, stderr)
}
rlog.Infof("Helm: helm version:\n%v %v", stdout, stderr)

rlog.Info("Helm: successfully initialized")

return helm, nil
Client = cliHelm

return nil
}

// InitTiller runs helm init with the same ServiceAccountName, NodeSelector and Tolerations
Expand Down
2 changes: 1 addition & 1 deletion pkg/helm/helm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func shouldUpgradeRelease(helm HelmClient, releaseName string, chart string, val
}

func TestHelm(t *testing.T) {
// Для теста требуется kubernetes + helm, поэтому skip
// Skip because this test needs a Kubernetes cluster and a helm binary.
t.Skip()

var err error
Expand Down
20 changes: 10 additions & 10 deletions pkg/kube_config_manager/kube_config_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,9 @@ func (kcm *MainKubeConfigManager) initConfig() error {
globalValuesChecksum = globalKubeConfig.Checksum
}

for module := range GetModulesNamesFromConfigData(obj.Data) {
for moduleName := range GetModulesNamesFromConfigData(obj.Data) {
// all GetModulesNamesFromConfigData must exist
moduleKubeConfig, err := ModuleKubeConfigMustExist(GetModuleKubeConfigFromConfigData(module, obj.Data))
moduleKubeConfig, err := ExtractModuleKubeConfig(moduleName, obj.Data)
if err != nil {
return err
}
Expand Down Expand Up @@ -337,9 +337,9 @@ func (kcm *MainKubeConfigManager) handleNewCm(obj *v1.ConfigMap) error {

// calculate new checksums of a module sections
newModulesValuesChecksum := make(map[string]string)
for module := range GetModulesNamesFromConfigData(obj.Data) {
for moduleName := range GetModulesNamesFromConfigData(obj.Data) {
// all GetModulesNamesFromConfigData must exist
moduleKubeConfig, err := ModuleKubeConfigMustExist(GetModuleKubeConfigFromConfigData(module, obj.Data))
moduleKubeConfig, err := ExtractModuleKubeConfig(moduleName, obj.Data)
if err != nil {
return err
}
Expand All @@ -365,21 +365,21 @@ func (kcm *MainKubeConfigManager) handleNewCm(obj *v1.ConfigMap) error {

// create ModuleConfig for each module in configData
// IsUpdated flag set for updated configs
for module := range actualModulesNames {
for moduleName := range actualModulesNames {
// all GetModulesNamesFromConfigData must exist
moduleKubeConfig, err := ModuleKubeConfigMustExist(GetModuleKubeConfigFromConfigData(module, obj.Data))
moduleKubeConfig, err := ExtractModuleKubeConfig(moduleName, obj.Data)
if err != nil {
return err
}

if moduleKubeConfig.Checksum != savedChecksums[module] && moduleKubeConfig.Checksum != kcm.ModulesValuesChecksum[module] {
kcm.ModulesValuesChecksum[module] = moduleKubeConfig.Checksum
if moduleKubeConfig.Checksum != savedChecksums[moduleName] && moduleKubeConfig.Checksum != kcm.ModulesValuesChecksum[moduleName] {
kcm.ModulesValuesChecksum[moduleName] = moduleKubeConfig.Checksum
moduleKubeConfig.ModuleConfig.IsUpdated = true
updatedCount++
} else {
moduleKubeConfig.ModuleConfig.IsUpdated = false
}
moduleConfigsActual[module] = moduleKubeConfig.ModuleConfig
moduleConfigsActual[moduleName] = moduleKubeConfig.ModuleConfig
}

// delete checksums for removed module sections
Expand Down Expand Up @@ -448,6 +448,7 @@ func (kcm *MainKubeConfigManager) handleCmDelete(obj *v1.ConfigMap) error {
// Global values is already known to be empty.
// So check each module values change separately,
// and generate signals per-module.
// Note: Only ModuleName field is needed in ModuleConfig.

moduleConfigsUpdate := make(ModuleConfigs)

Expand All @@ -459,7 +460,6 @@ func (kcm *MainKubeConfigManager) handleCmDelete(obj *v1.ConfigMap) error {
delete(kcm.ModulesValuesChecksum, module)
moduleConfigsUpdate[module] = utils.ModuleConfig{
ModuleName: module,
IsEnabled: true,
Values: make(utils.Values),
}
}
Expand Down
Loading

0 comments on commit 0d01582

Please sign in to comment.