Skip to content

Commit

Permalink
events pass context from Next api so failures can propogate to handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
johrstrom committed Oct 5, 2018
1 parent ecdf2f7 commit 0670405
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 45 deletions.
21 changes: 18 additions & 3 deletions common/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import (
"time"
)

const (
// LateEvent is the string returned for events that do not arrive on time
LateEvent string = "event did not arrive by the expected time"
)

// Event is the struct that represents something that has happened from some other system.
type Event struct {
Name string `json:"name" xml:"name,attr" db:"name"`
Expand All @@ -28,10 +33,20 @@ func (e *Event) ValidateEvent() error {
return nil
}

// IsSuccessful returns true if the event has met it's set of contstraints.
func (e *Event) IsSuccessful(c EventConstraints) bool {
// IsSuccessful returns true if the event has met it's set of contstraints. If false,
// it will also return a non-empty string as the reason for it's failure
func (e *Event) IsSuccessful(c EventConstraints) (bool, string) {
var reason string
var success bool

onTime := e.ReceivedAt <= c.ReceiveBy
return onTime

success = onTime
if !onTime {
reason = LateEvent
}

return success, reason
}

// FromBlueprint returns constraints for an event based on the start time and blueprints
Expand Down
8 changes: 6 additions & 2 deletions common/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,19 @@ import (
func TestOnTimeEvent(test *testing.T) {
nowEvent := getOldEvent(0)
constraints, _ := FromBlueprint(time.Now().Add(-2*time.Hour), makeConstraint("3h"))
success, reason := nowEvent.IsSuccessful(constraints)

assert.True(test, nowEvent.IsSuccessful(constraints))
assert.True(test, success)
assert.Equal(test, "", reason)
}

func TestLateEvent(test *testing.T) {
nowEvent := getOldEvent(0)
constraints, _ := FromBlueprint(time.Now().Add(-2*time.Hour), makeConstraint("1h"))
success, reason := nowEvent.IsSuccessful(constraints)

assert.False(test, nowEvent.IsSuccessful(constraints))
assert.False(test, success)
assert.Equal(test, LateEvent, reason)
}

func getOldEvent(secondsAgo int64) Event {
Expand Down
6 changes: 3 additions & 3 deletions schedule/handlers.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package schedule

// Handle is the email handler's implementation of the Handler interface.
func (h EmailHandlerNode) Handle() error {
return nil
func (h EmailHandlerNode) Handle(ctx *Context) {

}

// Name is the email handlers implementation of the Node interface.
Expand All @@ -11,6 +11,6 @@ func (h EmailHandlerNode) Name() string {
}

// Next defines what's after this node completes.
func (h EmailHandlerNode) Next() ([]*NodeInstance, error) {
func (h EmailHandlerNode) Next() ([]*NodeInstance, *Context) {
return nil, nil
}
32 changes: 22 additions & 10 deletions schedule/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,23 @@ import (
com "github.com/att/deadline/common"
)

const (
// EventNeverArrived indicates that the event never arrived
EventNeverArrived = "event did not arrive by the specified time"
)

// Name returns the name of the event node.
func (node *EventNode) Name() string {
return node.name
}

// Next returns the next nodes for an event node. Can return an empty array if
// you cannot yet move past this node.
func (node *EventNode) Next() ([]*NodeInstance, error) {
func (node *EventNode) Next() ([]*NodeInstance, *Context) {
next := make([]*NodeInstance, 0)
var successful bool
var failureReason string
var ctx *Context
var pastDue = time.Now().Unix() > node.constraints.ReceiveBy
var received = node.event != nil

Expand All @@ -26,7 +33,7 @@ func (node *EventNode) Next() ([]*NodeInstance, error) {
}

if received {
successful = node.event.IsSuccessful(node.constraints)
successful, failureReason = node.event.IsSuccessful(node.constraints)

} else if pastDue { // not received and past due
log.WithFields(logrus.Fields{
Expand All @@ -35,25 +42,22 @@ func (node *EventNode) Next() ([]*NodeInstance, error) {
"recieve-by": time.Unix(node.constraints.ReceiveBy, 0).Format(time.RFC3339),
}).Debug("node failed")

ctx = newContext(node.name, EventNeverArrived)
next = append(next, node.errorTo)
}

if successful {
next = append(next, node.okTo)
} else {
ctx = newContext(node.name, failureReason)
next = append(next, node.errorTo)
}

return next, nil
return next, ctx
}

// AddEvent adds an event to the EventNode
func (node *EventNode) AddEvent(e *com.Event) {
// if node.events == nil {
// node.events = make([]com.Event, 0)
// }

// node.events = append(node.events, e)
node.event = e
}

Expand All @@ -63,7 +67,7 @@ func (node *EndNode) Name() string {
}

// Next for an end node returns nil for both array and error
func (node *EndNode) Next() ([]*NodeInstance, error) {
func (node *EndNode) Next() ([]*NodeInstance, *Context) {
return nil, nil
}

Expand All @@ -73,8 +77,16 @@ func (node *StartNode) Name() string {
}

// Next for a start node returns an array of size 1 for it's 'to' value
func (node *StartNode) Next() ([]*NodeInstance, error) {
func (node *StartNode) Next() ([]*NodeInstance, *Context) {
next := make([]*NodeInstance, 1)
next[0] = node.to
return next, nil
}

func newContext(name string, reason string) *Context {
return &Context{
FailedNoded: name,
FailureReason: reason,
FailureTime: time.Now(),
}
}
5 changes: 3 additions & 2 deletions schedule/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@ func TestEventErrorTo(test *testing.T) {
} else {

node.AddEvent(&e)
next, err := node.Next()
next, ctx := node.Next()

assert.Nil(test, err, "")
assert.Equal(test, len(next), 1)
assert.Equal(test, next[0], endNode)
assert.NotNil(test, ctx)
assert.Equal(test, com.LateEvent, ctx.FailureReason)
}
}
28 changes: 16 additions & 12 deletions schedule/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,41 +10,38 @@ import (

// Evaluate the schedule completely.
func (schedule *Schedule) Evaluate() State {
schedule.walk(schedule.Start)
schedule.walk(schedule.Start, nil)

return schedule.state
}

func (schedule *Schedule) walk(instance *NodeInstance) {
if schedule.state != Running {
return
}
func (schedule *Schedule) walk(instance *NodeInstance, context *Context) {

switch node := instance.value.(type) {

case *StartNode:
schedule.walk(node.to)
schedule.walk(node.to, nil)
case *EventNode:
next, _ := node.Next()
next, ctx := node.Next()
if next != nil && len(next) > 0 {

if next[0] == node.errorTo {
schedule.state = Failed
}

schedule.walk(next[0])
schedule.walk(next[0], ctx)
}
case *EmailHandlerNode:
//handle
schedule.walk(node.to)
go node.Handle(context)
schedule.walk(node.to, nil)
case *EndNode:
if schedule.state != Failed {
schedule.state = Ended
}
case nil:
//log.Info("nil node type")
log.Debug("nil node type")
default:
//log.Info("unknown node type")
log.Debug("unknown node type")
}
}

Expand Down Expand Up @@ -308,3 +305,10 @@ func checkEmptyFields(blueprint *com.ScheduleBlueprint) error {
return nil
}
}

// func contextFromNode(node *EventNode) Context {
// ctx := Context{
// FailedNoded: node.name,
// FailureTime: time.Now(),
// }
// }
25 changes: 12 additions & 13 deletions schedule/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,6 @@ type Schedule struct {
state State
}

// ScheduledHandler is the type that handles failures in a schedule
type ScheduledHandler struct {
ScheduleName string `db:"schedulename"`
Name string `db:"name"`
Address string `db:"address"`
}

// ScheduleManager is tasked with running and maintaing all the schedules. There should only be 1 per process.
// It's tasked with the creation, destruction and evaulation of all schedules.
type ScheduleManager struct {
Expand All @@ -90,16 +83,23 @@ type ScheduleManager struct {
evalTicker *time.Ticker
}

// Context is the way to pass state between different nodes in a schedule
type Context struct {
FailedNoded string
FailureReason string
FailureTime time.Time
}

// Node is the interface for nodes in the schedules and provides ways to see what they are and how they connect
// to other Nodes.
type Node interface {
Next() ([]*NodeInstance, error)
Next() ([]*NodeInstance, *Context)
Name() string
}

// Handler is the interface for handlers to implement so the can handle failures in a uniform way.
type Handler interface {
Handle() error
Handle(*Context)
}

// NodeInstance is the actual instance of a Node interface.
Expand All @@ -112,10 +112,9 @@ type NodeInstance struct {
type EventNode struct {
name string
constraints com.EventConstraints
//events []com.Event
event *com.Event
okTo *NodeInstance
errorTo *NodeInstance
event *com.Event
okTo *NodeInstance
errorTo *NodeInstance
}

// StartNode is the Node implementing type for the start of a schedule.
Expand Down

0 comments on commit 0670405

Please sign in to comment.