Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: store the informer events even if the agent is not connected #271

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,12 @@ func (a *Agent) Start(ctx context.Context) error {
}
}()

// Wait for the app informer to be synced
err := a.appManager.EnsureSynced(waitForSyncedDuration)
if err != nil {
return fmt.Errorf("failed to sync applications: %w", err)
}

if a.remote != nil {
a.remote.SetClientMode(a.mode)
// TODO: Right now, maintainConnection always returns nil. Revisit
Expand All @@ -233,12 +239,6 @@ func (a *Agent) Start(ctx context.Context) error {

a.emitter = event.NewEventSource(fmt.Sprintf("agent://%s", "agent-managed"))

// Wait for the app informer to be synced
err := a.appManager.EnsureSynced(waitForSyncedDuration)
if err != nil {
return fmt.Errorf("failed to sync applications: %w", err)
}

return err
}

Expand Down
16 changes: 7 additions & 9 deletions agent/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ func (a *Agent) maintainConnection() error {
if err != nil {
log().Warnf("Could not connect to %s: %v", a.remote.Addr(), err)
} else {
err = a.queues.Create(a.remote.ClientID())
if err != nil {
log().Warnf("Could not create agent queue pair: %v", err)
} else {
a.SetConnected(true)
if !a.queues.HasQueuePair(a.remote.ClientID()) {
err = a.queues.Create(a.remote.ClientID())
if err != nil {
log().Warnf("Could not create agent queue pair: %v", err)
continue
}
}
a.SetConnected(true)
}
} else {
err = a.handleStreamEvents()
Expand Down Expand Up @@ -214,10 +216,6 @@ func (a *Agent) handleStreamEvents() error {
}

log().WithField("component", "EventHandler").Info("Stream closed")
err = a.queues.Delete(a.remote.ClientID(), true)
if err != nil {
log().Errorf("Could not remove agent queue: %v", err)
}

return nil
}
16 changes: 16 additions & 0 deletions agent/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ func (a *Agent) processIncomingApplication(ev *event.Event) error {
logCtx.Errorf("Error creating application: %v", err)
}
case event.SpecUpdate:
if !exists {
logCtx.Debug("Received an Update event for an app that doesn't exists. Creating the incoming app")
if _, err := a.createApplication(incomingApp); err != nil {
return fmt.Errorf("could not create incoming app: %w", err)
}
return nil
}

if !sourceUIDMatch {
logCtx.Debug("Source UID mismatch between the incoming app and existing app. Deleting the existing app")
if err := a.deleteApplication(incomingApp); err != nil {
Expand Down Expand Up @@ -150,6 +158,14 @@ func (a *Agent) processIncomingAppProject(ev *event.Event) error {
logCtx.Errorf("Error creating appproject: %v", err)
}
case event.SpecUpdate:
if !exists {
logCtx.Debug("Received an Update event for an appProject that doesn't exists. Creating the incoming appProject")
if _, err := a.createAppProject(incomingAppProject); err != nil {
return fmt.Errorf("could not create incoming appProject: %w", err)
}
return nil
}

if !sourceUIDMatch {
logCtx.Debug("Source UID mismatch between the incoming and existing appProject. Deleting the existing appProject")
if err := a.deleteAppProject(incomingAppProject); err != nil {
Expand Down
21 changes: 0 additions & 21 deletions agent/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,6 @@ func (a *Agent) addAppCreationToQueue(app *v1alpha1.Application) {
logCtx := log().WithField("event", "NewApp").WithField("application", app.QualifiedName())
logCtx.Debugf("New app event")

// If the agent is not connected, we ignore this event. It just makes no
// sense to fill up the send queue when we can't send.
if !a.IsConnected() {
logCtx.Trace("Agent is not connected, ignoring this event")
return
}

// Update events trigger a new event sometimes, too. If we've already seen
// the app, we just ignore the request then.
if a.appManager.IsManaged(app.QualifiedName()) {
Expand Down Expand Up @@ -66,13 +59,6 @@ func (a *Agent) addAppUpdateToQueue(old *v1alpha1.Application, new *v1alpha1.App
return
}

// If the agent is not connected, we ignore this event. It just makes no
// sense to fill up the send queue when we can't send.
if !a.IsConnected() {
logCtx.Trace("Agent is not connected, ignoring this event")
return
}

// If the app is not managed, we ignore this event.
if !a.appManager.IsManaged(new.QualifiedName()) {
logCtx.Tracef("App is not managed")
Expand Down Expand Up @@ -109,13 +95,6 @@ func (a *Agent) addAppDeletionToQueue(app *v1alpha1.Application) {
logCtx := log().WithField("event", "DeleteApp").WithField("application", app.QualifiedName())
logCtx.Debugf("Delete app event")

// If the agent is not connected, we ignore this event. It just makes no
// sense to fill up the send queue when we can't send.
if !a.IsConnected() {
logCtx.Trace("Agent is not connected, ignoring this event")
return
}

if !a.appManager.IsManaged(app.QualifiedName()) {
logCtx.Tracef("App is not managed")
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,7 @@ func (k EventContextKey) String() string {
return string(k)
}

const ContextAgentIdentifier EventContextKey = "agent_name"
const (
ContextAgentIdentifier EventContextKey = "agent_name"
ContextAgentMode EventContextKey = "agent_mode"
)
25 changes: 25 additions & 0 deletions principal/apis/eventstream/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/argoproj-labs/argocd-agent/internal/queue"
"github.com/argoproj-labs/argocd-agent/internal/session"
"github.com/argoproj-labs/argocd-agent/pkg/api/grpc/eventstreamapi"
"github.com/argoproj-labs/argocd-agent/pkg/types"
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -160,6 +161,17 @@ func (s *Server) newClientConnection(ctx context.Context, timeout time.Duration)
return c, nil
}

// agentMode gets the agent mode from the context ctx. Returns an error
// if no agent mode is found in the context
func agentMode(ctx context.Context) (string, error) {
agentMode, ok := ctx.Value(types.ContextAgentMode).(string)
if !ok {
return "", fmt.Errorf("invalid context: no agent mode")
}

return agentMode, nil
}

// onDisconnect must be called whenever client c disconnects from the stream
func (s *Server) onDisconnect(c *client) {
c.lock.Lock()
Expand Down Expand Up @@ -271,6 +283,19 @@ func (s *Server) sendFunc(c *client, subs eventstreamapi.EventStream_SubscribeSe
return fmt.Errorf("panic: nil item in queue")
}

mode, err := agentMode(c.ctx)
if err != nil {
return fmt.Errorf("unable to extract the agent mode from context")
}

if types.AgentModeFromString(mode) != types.AgentModeManaged {
// Only Update events are valid for unmanaged agents
if ev.Type() != event.Update.String() {
logCtx.WithField("type", ev.Type()).Debug("Discarding event for unmanaged agent")
return nil
}
}

eventWriter := s.eventWriters.Get(c.agentName)
if eventWriter == nil {
return fmt.Errorf("panic: event writer not found for agent %s", c.agentName)
Expand Down
43 changes: 41 additions & 2 deletions principal/apis/eventstream/eventstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ import (
"testing"
"time"

"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"github.com/argoproj-labs/argocd-agent/internal/event"
"github.com/argoproj-labs/argocd-agent/internal/queue"
"github.com/argoproj-labs/argocd-agent/pkg/types"
"github.com/argoproj-labs/argocd-agent/principal/apis/eventstream/mock"
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"github.com/sirupsen/logrus"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand All @@ -36,7 +37,10 @@ func Test_Subscribe(t *testing.T) {
qs := queue.NewSendRecvQueues()
qs.Create("default")
s := NewServer(qs)
st := &mock.MockEventServer{AgentName: "default"}
st := &mock.MockEventServer{
AgentName: "default",
AgentMode: string(types.AgentModeManaged),
}
st.AddRecvHook(func(s *mock.MockEventServer) error {
log().WithField("component", "RecvHook").Tracef("Entry")
ticker := time.NewTicker(500 * time.Millisecond)
Expand Down Expand Up @@ -110,6 +114,41 @@ func Test_Subscribe(t *testing.T) {
assert.Error(t, err)
})

t.Run("Test events being discarded for unmanaged agent", func(t *testing.T) {
qs := queue.NewSendRecvQueues()
qs.Create("default")
s := NewServer(qs)
st := &mock.MockEventServer{
AgentName: "default",
AgentMode: string(types.AgentModeAutonomous),
}
st.AddRecvHook(func(s *mock.MockEventServer) error {
log().WithField("component", "RecvHook").Tracef("Entry")
ticker := time.NewTicker(500 * time.Millisecond)
<-ticker.C
ticker.Stop()
log().WithField("component", "RecvHook").Tracef("Exit")
return io.EOF
})
emitter := event.NewEventSource("test")
qs.SendQ("default").Add(emitter.ApplicationEvent(
event.Create,
&v1alpha1.Application{ObjectMeta: v1.ObjectMeta{Name: "foo", Namespace: "test"}},
))
qs.SendQ("default").Add(emitter.ApplicationEvent(
event.Update,
&v1alpha1.Application{ObjectMeta: v1.ObjectMeta{Name: "foo", Namespace: "test"}},
))
qs.SendQ("default").Add(emitter.ApplicationEvent(
event.Delete,
&v1alpha1.Application{ObjectMeta: v1.ObjectMeta{Name: "foo", Namespace: "test"}},
))
err := s.Subscribe(st)
assert.Nil(t, err)
assert.Equal(t, 0, int(st.NumRecv.Load()))
assert.Equal(t, 1, int(st.NumSent.Load()))
})

}

func init() {
Expand Down
13 changes: 10 additions & 3 deletions principal/apis/eventstream/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type MockEventServer struct {
grpc.ServerStream

AgentName string
AgentMode string
NumSent atomic.Uint32
NumRecv atomic.Uint32
Application v1alpha1.Application
Expand All @@ -58,11 +59,17 @@ func (s *MockEventServer) AddRecvHook(hook RecvHook) {
}

func (s *MockEventServer) Context() context.Context {
ctx := context.TODO()

if s.AgentName != "" {
return context.WithValue(context.TODO(), types.ContextAgentIdentifier, s.AgentName)
} else {
return context.TODO()
ctx = context.WithValue(ctx, types.ContextAgentIdentifier, s.AgentName)
}

if s.AgentMode != "" {
ctx = context.WithValue(ctx, types.ContextAgentMode, s.AgentMode)
}

return ctx
}

func (s *MockEventServer) Send(sub *eventstreamapi.Event) error {
Expand Down
3 changes: 2 additions & 1 deletion principal/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ func (s *Server) authenticate(ctx context.Context) (context.Context, error) {

// claims at this point is validated and we can propagate values to the
// context.
authCtx := session.ClientIdToContext(ctx, agentInfo.ClientID)
authCtx := context.WithValue(session.ClientIdToContext(ctx, agentInfo.ClientID),
types.ContextAgentMode, agentInfo.Mode)
if !s.queues.HasQueuePair(agentInfo.ClientID) {
logCtx.Tracef("Creating a new queue pair for client %s", agentInfo.ClientID)
if err := s.queues.Create(agentInfo.ClientID); err != nil {
Expand Down
60 changes: 30 additions & 30 deletions principal/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,12 @@ func (s *Server) newAppCallback(outbound *v1alpha1.Application) {
"application_name": outbound.Name,
})

// Return early if no interested agent is connected
if !s.queues.HasQueuePair(outbound.Namespace) {
logCtx.Debug("No agent is connected to this queue, discarding event")
return
}

// New app events are only relevant for managed agents
mode := s.agentMode(outbound.Namespace)
if mode != types.AgentModeManaged {
logCtx.Tracef("Discarding event for unmanaged agent")
return
if err := s.queues.Create(outbound.Namespace); err != nil {
logCtx.WithError(err).Error("failed to create a queue pair for an existing agent namespace")
return
}
logCtx.Trace("Created a new queue pair for the existing namespace")
}
q := s.queues.SendQ(outbound.Namespace)
if q == nil {
Expand Down Expand Up @@ -77,8 +72,11 @@ func (s *Server) updateAppCallback(old *v1alpha1.Application, new *v1alpha1.Appl
return
}
if !s.queues.HasQueuePair(old.Namespace) {
logCtx.Tracef("No agent is connected to this queue, discarding event")
return
if err := s.queues.Create(old.Namespace); err != nil {
logCtx.WithError(err).Error("failed to create a queue pair for an existing agent namespace")
return
}
logCtx.Trace("Created a new queue pair for the existing agent namespace")
}
q := s.queues.SendQ(old.Namespace)
if q == nil {
Expand All @@ -98,13 +96,11 @@ func (s *Server) deleteAppCallback(outbound *v1alpha1.Application) {
"application_name": outbound.Name,
})
if !s.queues.HasQueuePair(outbound.Namespace) {
logCtx.Tracef("No agent is connected to this queue, discarding event")
return
}
mode := s.agentMode(outbound.Namespace)
if !mode.IsManaged() {
logCtx.Tracef("Discarding event for unmanaged agent")
return
if err := s.queues.Create(outbound.Namespace); err != nil {
logCtx.WithError(err).Error("failed to create a queue pair for an existing agent namespace")
return
}
logCtx.Trace("Created a new queue pair for the existing agent namespace")
}
q := s.queues.SendQ(outbound.Namespace)
if q == nil {
Expand All @@ -129,8 +125,11 @@ func (s *Server) newAppProjectCallback(outbound *v1alpha1.AppProject) {

// Return early if no interested agent is connected
if !s.queues.HasQueuePair(outbound.Namespace) {
logCtx.Debug("No agent is connected to this queue, discarding event")
return
if err := s.queues.Create(outbound.Namespace); err != nil {
logCtx.WithError(err).Error("failed to create a queue pair for an existing agent namespace")
return
}
logCtx.Trace("Created a new queue pair for the existing namespace")
}

// New appproject events are only relevant for managed agents
Expand Down Expand Up @@ -172,8 +171,11 @@ func (s *Server) updateAppProjectCallback(old *v1alpha1.AppProject, new *v1alpha
return
}
if !s.queues.HasQueuePair(old.Namespace) {
logCtx.Tracef("No agent is connected to this queue, discarding event")
return
if err := s.queues.Create(old.Namespace); err != nil {
logCtx.WithError(err).Error("failed to create a queue pair for an existing agent namespace")
return
}
logCtx.Trace("Created a new queue pair for the existing agent namespace")
}
q := s.queues.SendQ(old.Namespace)
if q == nil {
Expand All @@ -193,13 +195,11 @@ func (s *Server) deleteAppProjectCallback(outbound *v1alpha1.AppProject) {
"appproject_name": outbound.Name,
})
if !s.queues.HasQueuePair(outbound.Namespace) {
logCtx.Tracef("No agent is connected to this queue, discarding event")
return
}
mode := s.agentMode(outbound.Namespace)
if !mode.IsManaged() {
logCtx.Tracef("Discarding event for unmanaged agent")
return
if err := s.queues.Create(outbound.Namespace); err != nil {
logCtx.WithError(err).Error("failed to create a queue pair for an existing agent namespace")
return
}
logCtx.Trace("Created a new queue pair for the existing agent namespace")
}
q := s.queues.SendQ(outbound.Namespace)
if q == nil {
Expand Down
Loading
Loading