Skip to content

Commit

Permalink
fix: finish events refactor (#509)
Browse files Browse the repository at this point in the history
The events refactor from several weeks ago, #458, introduced postgres
triggers as a means of publishing events. But it was never 100%
completed, leaving behind many `Publish()` calls that do nothing.

Furthermore, the refactor introduced a regression: the agent event
subscriber was not also refactored and therefore it no longer received
cancelation events. When a user canceled a run, the run status would be
set to canceled but the terraform process would not receive an interrupt
signal. This PR fixes this, and introduces an integration test to
prevent re-occurance of the regression.
  • Loading branch information
leg100 authored Jul 8, 2023
1 parent d85ac43 commit 096933a
Show file tree
Hide file tree
Showing 34 changed files with 221 additions and 212 deletions.
24 changes: 14 additions & 10 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ type agent struct {
client.Client
logr.Logger

spooler // spools new run events
*terminator // terminates runs
Downloader // terraform cli downloader
spooler // spools new run events
*terminator // terminates runs
Downloader // terraform cli downloader
terraformPathFinder // determines destination dir for terraform bins

envs []string // terraform environment variables
}
Expand All @@ -61,14 +62,17 @@ func NewAgent(logger logr.Logger, app client.Client, cfg Config) (*agent, error)
logger.V(0).Info("enabled debug mode")
}

pathFinder := newTerraformPathFinder(cfg.TerraformBinDir)

agent := &agent{
Client: app,
Config: cfg,
Logger: logger,
envs: DefaultEnvs,
spooler: newSpooler(app, logger, cfg),
terminator: newTerminator(),
Downloader: newTerraformDownloader(),
Client: app,
Config: cfg,
Logger: logger,
envs: DefaultEnvs,
spooler: newSpooler(app, logger, cfg),
terminator: newTerminator(),
Downloader: newTerraformDownloader(pathFinder),
terraformPathFinder: pathFinder,
}

if cfg.PluginCache {
Expand Down
13 changes: 7 additions & 6 deletions internal/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ import (
type (
// Config is configuration for an agent.
Config struct {
Organization *string // only process runs belonging to org
External bool // dedicated agent (true) or integrated into otfd (false)
Concurrency int // number of workers
Sandbox bool // isolate privileged ops within sandbox
Debug bool // toggle debug mode
PluginCache bool // toggle use of terraform's shared plugin cache
Organization *string // only process runs belonging to org
External bool // dedicated agent (true) or integrated into otfd (false)
Concurrency int // number of workers
Sandbox bool // isolate privileged ops within sandbox
Debug bool // toggle debug mode
PluginCache bool // toggle use of terraform's shared plugin cache
TerraformBinDir string // destination directory for terraform binaries
}
// ExternalConfig is configuration for an external agent
ExternalConfig struct {
Expand Down
22 changes: 9 additions & 13 deletions internal/agent/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,10 @@ const HashicorpReleasesHost = "releases.hashicorp.com"
type (
// terraformDownloader downloads terraform binaries
terraformDownloader struct {
// server hosting binaries
host string
// used to lookup destination path for saving download
terraform
// client for downloading from server via http
client *http.Client
// mutex channel
mu chan struct{}
host string // server hosting binaries
terraformPathFinder // used to lookup destination path for saving download
client *http.Client // client for downloading from server via http
mu chan struct{} // ensures only one download at a time
}

// Downloader downloads a specific version of a binary and returns its path
Expand All @@ -33,15 +29,15 @@ type (
}
)

func newTerraformDownloader() *terraformDownloader {
func newTerraformDownloader(pathFinder terraformPathFinder) *terraformDownloader {
mu := make(chan struct{}, 1)
mu <- struct{}{}

return &terraformDownloader{
host: HashicorpReleasesHost,
terraform: &terraformPathFinder{},
client: &http.Client{},
mu: mu,
host: HashicorpReleasesHost,
terraformPathFinder: pathFinder,
client: &http.Client{},
mu: mu,
}
}

Expand Down
13 changes: 2 additions & 11 deletions internal/agent/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"net/http/httptest"
"net/url"
"os"
"path"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -25,9 +24,9 @@ func TestDownloader(t *testing.T) {
u, err := url.Parse(srv.URL)
require.NoError(t, err)

dl := newTerraformDownloader()
pathFinder := newTerraformPathFinder(t.TempDir())
dl := newTerraformDownloader(pathFinder)
dl.host = u.Host
dl.terraform = &fakeTerraform{t.TempDir()}
dl.client = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
Expand All @@ -43,11 +42,3 @@ func TestDownloader(t *testing.T) {
assert.Equal(t, "I am a fake terraform binary\n", string(tfbin))
assert.Equal(t, "downloading terraform, version 1.2.3\n", buf.String())
}

type fakeTerraform struct {
dir string
}

func (f *fakeTerraform) TerraformPath(version string) string {
return path.Join(f.dir, version, "terraform")
}
31 changes: 14 additions & 17 deletions internal/agent/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,10 @@ type environment struct {
func newEnvironment(
ctx context.Context,
logger logr.Logger,
svc client.Client,
agent *agent,
run *run.Run,
envs []string,
downloader Downloader,
cfg Config,
) (*environment, error) {
ws, err := svc.GetWorkspace(ctx, run.WorkspaceID)
ws, err := agent.GetWorkspace(ctx, run.WorkspaceID)
if err != nil {
return nil, errors.Wrap(err, "retrieving workspace")
}
Expand All @@ -62,17 +59,17 @@ func newEnvironment(
// via an environment variable.
//
// NOTE: environment variable support is only available in terraform >= 1.2.0
token, err := svc.CreateRunToken(ctx, tokens.CreateRunTokenOptions{
token, err := agent.CreateRunToken(ctx, tokens.CreateRunTokenOptions{
Organization: &ws.Organization,
RunID: &run.ID,
})
if err != nil {
return nil, errors.Wrap(err, "creating registry session")
}
envs = append(envs, internal.CredentialEnv(svc.Hostname(), token))
envs := internal.SafeAppend(agent.envs, internal.CredentialEnv(agent.Hostname(), token))

// retrieve workspace variables and add them to the environment
variables, err := svc.ListVariables(ctx, run.WorkspaceID)
variables, err := agent.ListVariables(ctx, run.WorkspaceID)
if err != nil {
return nil, errors.Wrap(err, "retrieving workspace variables")
}
Expand All @@ -86,25 +83,25 @@ func newEnvironment(
writer := logs.NewPhaseWriter(ctx, logs.PhaseWriterOptions{
RunID: run.ID,
Phase: run.Phase(),
Writer: svc,
Writer: agent,
})

env := &environment{
Logger: logger,
Client: svc,
Downloader: downloader,
Client: agent,
Downloader: agent,
out: writer,
workdir: wd,
variables: variables,
ctx: ctx,
runner: &runner{out: writer},
executor: &executor{
Config: cfg,
terraform: &terraformPathFinder{},
version: ws.TerraformVersion,
out: writer,
envs: envs,
workdir: wd,
Config: agent.Config,
terraformPathFinder: agent.terraformPathFinder,
version: ws.TerraformVersion,
out: writer,
envs: envs,
workdir: wd,
},
}

Expand Down
2 changes: 1 addition & 1 deletion internal/agent/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type (
// executor executes processes.
executor struct {
Config
terraform
terraformPathFinder

version string // terraform cli version
out io.Writer
Expand Down
5 changes: 2 additions & 3 deletions internal/agent/spooler.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ func (s *spoolerDaemon) reinitialize(ctx context.Context) error {
// whereas we want oldest first.
for i := len(existing) - 1; i >= 0; i-- {
s.handleEvent(pubsub.Event{
Type: pubsub.EventRunStatusUpdate,
Payload: existing[i],
})
}
Expand Down Expand Up @@ -139,9 +138,9 @@ func (s *spoolerDaemon) handleRun(event pubsub.EventType, run *run.Run) {

if run.Queued() {
s.queue <- run
} else if event == pubsub.EventRunCancel {
} else if run.Status == internal.RunCanceled {
s.cancelations <- cancelation{Run: run}
} else if event == pubsub.EventRunForceCancel {
} else if run.Status == internal.RunForceCanceled {
s.cancelations <- cancelation{Run: run, Forceful: true}
}
}
10 changes: 4 additions & 6 deletions internal/agent/spooler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ func TestSpooler(t *testing.T) {
db := []*run.Run{run1, run2}
events := make(chan pubsub.Event, 3)
events <- pubsub.Event{Payload: run3}
events <- pubsub.Event{Type: pubsub.EventRunCancel, Payload: run4}
events <- pubsub.Event{Type: pubsub.EventRunForceCancel, Payload: run5}
events <- pubsub.Event{Payload: run4}
events <- pubsub.Event{Payload: run5}

spooler := newSpooler(
&fakeSpoolerApp{runs: db, events: events},
Expand Down Expand Up @@ -100,21 +100,19 @@ func TestSpooler_handleEvent(t *testing.T) {
{
name: "handle cancelation",
event: pubsub.Event{
Type: pubsub.EventRunCancel,
Payload: &run.Run{
ExecutionMode: workspace.RemoteExecutionMode,
Status: internal.RunPlanning,
Status: internal.RunCanceled,
},
},
wantCancelation: true,
},
{
name: "handle forceful cancelation",
event: pubsub.Event{
Type: pubsub.EventRunForceCancel,
Payload: &run.Run{
ExecutionMode: workspace.RemoteExecutionMode,
Status: internal.RunPlanning,
Status: internal.RunForceCanceled,
},
},
wantForceCancelation: true,
Expand Down
24 changes: 17 additions & 7 deletions internal/agent/terraform.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,23 @@ import (
"path"
)

type terraform interface {
TerraformPath(version string) string
}
var defaultTerraformBinDir = path.Join(os.TempDir(), "otf-terraform-bins")

type (
terraformPathFinder struct {
dest string
}
)

type terraformPathFinder struct{}
func newTerraformPathFinder(dest string) terraformPathFinder {
if dest == "" {
dest = defaultTerraformBinDir
}
return terraformPathFinder{
dest: dest,
}
}

// TerraformPath returns the path to a given version of the terraform binary
func (*terraformPathFinder) TerraformPath(version string) string {
return path.Join(os.TempDir(), "otf-terraform-bins", version, "terraform")
func (t terraformPathFinder) TerraformPath(version string) string {
return path.Join(t.dest, version, "terraform")
}
5 changes: 1 addition & 4 deletions internal/agent/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,8 @@ func (w *worker) handle(ctx context.Context, r *run.Run) {
env, err := newEnvironment(
ctx,
log,
w.Client,
w.agent,
r,
w.envs,
w.Downloader,
w.Config,
)
if err != nil {
log.Error(err, "creating execution environment")
Expand Down
4 changes: 2 additions & 2 deletions internal/api/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestAPI_Watch(t *testing.T) {
// send one event and then close
in <- pubsub.Event{
Payload: &run.Run{ID: "run-123"},
Type: pubsub.EventRunCreated,
Type: pubsub.CreatedEvent,
}
close(in)

Expand All @@ -43,7 +43,7 @@ func TestAPI_Watch(t *testing.T) {
got = strings.TrimSpace(got)
parts := strings.Split(got, "\n")
if assert.Equal(t, 2, len(parts)) {
assert.Equal(t, "event: run_created", parts[1])
assert.Equal(t, "event: created", parts[1])
if assert.Regexp(t, `data: .*`, parts[0]) {
data := strings.TrimPrefix(parts[0], "data: ")
// base64 decode
Expand Down
2 changes: 1 addition & 1 deletion internal/integration/daemon_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func (s *testDaemon) tfcliWithError(t *testing.T, ctx context.Context, command,
cmd := exec.Command("terraform", cmdargs...)
cmd.Dir = configPath

cmd.Env = appendSharedEnvs(internal.CredentialEnv(s.Hostname(), token))
cmd.Env = internal.SafeAppend(sharedEnvs, internal.CredentialEnv(s.Hostname(), token))

out, err := cmd.CombinedOutput()
return string(out), err
Expand Down
8 changes: 0 additions & 8 deletions internal/integration/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,3 @@ func setenv(name, value string) (func(), error) {
os.Unsetenv(name)
}, nil
}

// appendSharedEnvs appends environment variables to the shared environment
// variables in a thread-safe manner.
func appendSharedEnvs(envs ...string) []string {
dst := make([]string, len(sharedEnvs)+len(envs))
copy(dst, sharedEnvs)
return append(dst, envs...)
}
Loading

0 comments on commit 096933a

Please sign in to comment.