Skip to content

Commit

Permalink
A lot of work for #14 where the Node interface passes context back to…
Browse files Browse the repository at this point in the history
… the Schedule.
  • Loading branch information
johrstrom committed Mar 11, 2019
1 parent 8491d00 commit 84517c6
Show file tree
Hide file tree
Showing 12 changed files with 130 additions and 96 deletions.
8 changes: 1 addition & 7 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 36 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"sync"
"time"

"github.com/sirupsen/logrus"
Expand All @@ -23,8 +24,37 @@ var (
DisableTimestamp: false,
TimestampFormat: time.RFC3339,
}

globalConfig *Config

cfgLock = sync.RWMutex{}
)

// GetConfig will return a global instance of the configuration if it has ever been loaded through
// LoadConfig. It can also return the default config if LoadConfig has never been called.
func GetConfig() *Config {
cfgLock.RLock()
defer cfgLock.RUnlock()

if globalConfig == nil {
return &DefaultConfig
}

return globalConfig
}

// GetEmailConfig will the EmailConfig portion of the global config object.
func GetEmailConfig() *EmailConfig {
cfgLock.RLock()
defer cfgLock.RUnlock()

if globalConfig == nil {
return &DefaultConfig.EmailConfig
}

return &globalConfig.EmailConfig
}

// LoadConfig loads the configuration based on the input file. Errors can occur for various
// i/o or marshalling related reasons. Defaults will be returned for primitive types, like strings.
func LoadConfig(filename string) (*Config, error) {
Expand All @@ -48,6 +78,10 @@ func LoadConfig(filename string) (*Config, error) {
config.loggers = make(map[string]*logrus.Logger)
}

cfgLock.Lock()
defer cfgLock.Unlock()

globalConfig = config
return config, nil

}
Expand All @@ -69,10 +103,9 @@ func (c *Config) GetLogger(name string) *logrus.Logger {
var logger *logrus.Logger
var found bool

c.logLock.Lock() //locking strategy probably a bit aggressive
defer c.logLock.Unlock()

if logger, found = c.loggers[name]; !found {
c.modLock.Lock() //locking strategy probably a bit aggressive
defer c.modLock.Unlock()

logger := logrus.New()
logger.Formatter = formatter
Expand Down
4 changes: 2 additions & 2 deletions config/testdata/file_with_loggers.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@

storage: "file"
fileconfig:
file_config:
directory: "/tmp/deadline"
serverconfig:
server_config:
port: "8082"
logs:
manager: "debug"
Expand Down
6 changes: 3 additions & 3 deletions config/testdata/goodfile.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@

storage: "file"
fileconfig:
file_config:
directory: "../server/"
dbconfig:
db_config:
connection_string: "N/A"
serverconfig:
server_config:
port: "8081"
10 changes: 5 additions & 5 deletions config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ const (

// Config represents the configuration struct for the entire deadline application
type Config struct {
FileConfig FileConfig `yaml:"fileconfig"`
DBConfig DBConfig `yaml:"dbconfig"`
FileConfig FileConfig `yaml:"file_config"`
DBConfig DBConfig `yaml:"db_config"`
Storage string `yaml:"storage"`
EvalTime string `yaml:"eval_timing"`
Server ServerConfig `yaml:"serverconfig"`
EmailConfig EmailConfig `yaml:"emailconfig"`
Server ServerConfig `yaml:"server_config"`
EmailConfig EmailConfig `yaml:"email_config"`
Logconfig map[string]string `yaml:"logs"`
loggers map[string]*logrus.Logger
logLock sync.Mutex
modLock sync.RWMutex
}

// FileConfig is the configuration type for file storage
Expand Down
39 changes: 17 additions & 22 deletions notifier/notifier.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,24 @@
package notifier

func (w Webhook) Send(msg string) {
// var str string
// str = msg
// jv, err := json.Marshal(str)
// if err != nil {
// common.CheckError(err)
// }
// _ , err = http.Post(w.Addr,"application/json", bytes.NewBuffer(jv))
// common.CheckError(err)
import (
"sync"

}
"github.com/att/deadline/config"
)

var once sync.Once
var notifier *Notifier

// func NewNotifyHandler(handlerType string, addr string) NotifyHandler {
// GetInstance gets the current running instance of a Notifier class
func GetInstance(cfg *config.Config) *Notifier {
once.Do(func() {
notifier = &Notifier{}
})

// switch handlerType {
// case "WEBHOOK":
return notifier
}

// w := &Webhook{
// Addr: addr,
// }
// w.TH.Name = handlerType
// Notify is the main API to notify some entity with a message of some kind
func (notifier *Notifier) Notify(notification Notification) {

// return w
// }
// common.Info.Println("Did not give a valid handler.")
// return &Webhook{}
// }
}
18 changes: 3 additions & 15 deletions notifier/types.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,6 @@
package notifier
import (
"net/http"
)
package notifier

type NotifyHandler interface {
Send(string)
type Notifier struct {
}

type TypeHandler struct {
Name string
}
type Webhook struct {
TH TypeHandler
Addr string
Handler http.Handler

}
type Notification map[string]string
17 changes: 11 additions & 6 deletions schedule/handlers.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package schedule

// Handle is the email handler's implementation of the Handler interface.
func (h EmailHandlerNode) Handle(ctx *Context) {
func (handler EmailHandlerNode) Handle(ctx *Context) {
// cfg := config.GetEmailConfig()

// var client *smtp.Client

}

// Name is the email handlers implementation of the Node interface.
func (h EmailHandlerNode) Name() string {
return h.name
func (handler EmailHandlerNode) Name() string {
return handler.name
}

// Next defines what's after this node completes.
func (h EmailHandlerNode) Next() ([]*NodeInstance, *Context) {
return nil, nil
// Next for this type is simply defined. There's no logic computed. It
// return nil context.
func (handler EmailHandlerNode) Next() ([]*NodeInstance, *Context) {
var ret []*NodeInstance
return append(ret, handler.to), nil
}
35 changes: 24 additions & 11 deletions schedule/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,36 +24,39 @@ func (node *EventNode) Next() ([]*NodeInstance, *Context) {
next := make([]*NodeInstance, 0)
var successful bool
var failureReason string
var ctx *Context
var ctx = newContext()
var pastDue = time.Now().Unix() > node.constraints.ReceiveBy
var received = node.event != nil

if !received && !pastDue {
return next, nil
return nil, &ctx
}

if received {
successful, failureReason = node.event.IsSuccessful(node.constraints)

} else if pastDue { // not received and past due

// logline here for debugging bc we can't currently see the schedule state through
// the api. it will probably be redundant/much less useful when we can.
log.WithFields(logrus.Fields{
"node": node.name,
"reason": "event never arrived",
"recieve-by": time.Unix(node.constraints.ReceiveBy, 0).Format(time.RFC3339),
}).Debug("node failed")

ctx = newContext(node.name, EventNeverArrived)
ctx = newFailedContext(node.name, EventNeverArrived)
next = append(next, node.errorTo)
}

if successful {
next = append(next, node.okTo)
} else {
ctx = newContext(node.name, failureReason)
ctx = newFailedContext(node.name, failureReason)
next = append(next, node.errorTo)
}

return next, ctx
return next, &ctx
}

// AddEvent adds an event to the EventNode
Expand All @@ -66,7 +69,7 @@ func (node *EndNode) Name() string {
return node.name
}

// Next for an end node returns nil for both array and error
// Next for an end node returns nil for both parameters
func (node *EndNode) Next() ([]*NodeInstance, *Context) {
return nil, nil
}
Expand All @@ -83,10 +86,20 @@ func (node *StartNode) Next() ([]*NodeInstance, *Context) {
return next, nil
}

func newContext(name string, reason string) *Context {
return &Context{
FailedNoded: name,
FailureReason: reason,
FailureTime: time.Now(),
func newFailedContext(name string, reason string) Context {
return Context{
Successful: false,
FailureContext: &FailureContext{
Node: name,
Reason: reason,
Time: time.Now(),
},
}
}

func newContext() Context {
return Context{
Successful: true,
FailureContext: nil,
}
}
8 changes: 5 additions & 3 deletions schedule/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,14 @@ func TestEventOKTo(test *testing.T) {
} else {

node.AddEvent(&e)
next, err := node.Next()
next, ctx := node.Next()

assert.Nil(test, err, "")
assert.NotNil(test, ctx, "")
assert.Equal(test, true, ctx.Successful)
assert.Equal(test, len(next), 1)
assert.Equal(test, next[0], secondEventNode)
}

}

func TestEventErrorTo(test *testing.T) {
Expand All @@ -80,6 +82,6 @@ func TestEventErrorTo(test *testing.T) {
assert.Equal(test, len(next), 1)
assert.Equal(test, next[0], endNode)
assert.NotNil(test, ctx)
assert.Equal(test, com.LateEvent, ctx.FailureReason)
assert.Equal(test, com.LateEvent, ctx.FailureContext.Reason)
}
}
23 changes: 10 additions & 13 deletions schedule/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,30 @@ import (

// Evaluate the schedule completely.
func (schedule *Schedule) Evaluate() State {
schedule.walk(schedule.Start, nil)
schedule.walk(schedule.Start)

return schedule.state
}

func (schedule *Schedule) walk(instance *NodeInstance, context *Context) {
func (schedule *Schedule) walk(instance *NodeInstance) {

switch node := instance.value.(type) {

case *StartNode:
schedule.walk(node.to, nil)
schedule.walk(node.to)
case *EventNode:
next, ctx := node.Next()
next, _ := node.Next()
if next != nil && len(next) > 0 {

if next[0] == node.errorTo {
schedule.state = Failed
}

schedule.walk(next[0], ctx)
schedule.walk(next[0])
}
case *EmailHandlerNode:
go node.Handle(context)
schedule.walk(node.to, nil)
//go node.Handle(context)
schedule.walk(node.to)
case *EndNode:
if schedule.state != Failed {
schedule.state = Ended
Expand Down Expand Up @@ -306,9 +306,6 @@ func checkEmptyFields(blueprint *com.ScheduleBlueprint) error {
}
}

// func contextFromNode(node *EventNode) Context {
// ctx := Context{
// FailedNoded: node.name,
// FailureTime: time.Now(),
// }
// }
func addFailureContext(ctx FailureContext) {

}
Loading

0 comments on commit 84517c6

Please sign in to comment.