Skip to content

Commit

Permalink
feat: add --pid-file option to write PID files (#25499)
Browse files Browse the repository at this point in the history
Add `--pid-file` option to write PID files on startup. The PID filename
is specified by the argument after `--pid-file`. If the PID file already exists, influxd will exit unless the `--overwrite-pid-file` flag is also used.

Example: `influxd --pid-file /var/lib/influxd/influxd.pid`

PID files are automatically removed when the influxd process is shutdown.

Closes: #25498
(cherry picked from commit c35321b)
(cherry picked from commit 48f7600)
  • Loading branch information
gwossum authored Oct 29, 2024
1 parent 981f2fc commit e9ea8cf
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 1 deletion.
18 changes: 18 additions & 0 deletions cmd/influxd/launcher/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ type InfluxdOpts struct {
TracingType string
ReportingDisabled bool

PIDFile string
OverwritePIDFile bool

AssetsPath string
BoltPath string
SqLitePath string
Expand Down Expand Up @@ -213,6 +216,9 @@ func NewOpts(viper *viper.Viper) *InfluxdOpts {
FluxLogEnabled: false,
ReportingDisabled: false,

PIDFile: "",
OverwritePIDFile: false,

BoltPath: filepath.Join(dir, bolt.DefaultFilename),
SqLitePath: filepath.Join(dir, sqlite.DefaultFilename),
EnginePath: filepath.Join(dir, "engine"),
Expand Down Expand Up @@ -325,6 +331,18 @@ func (o *InfluxdOpts) BindCliOpts() []cli.Opt {
Default: o.ReportingDisabled,
Desc: "disable sending telemetry data to https://telemetry.influxdata.com every 8 hours",
},
{
DestP: &o.PIDFile,
Flag: "pid-file",
Default: o.PIDFile,
Desc: "write process ID to a file",
},
{
DestP: &o.OverwritePIDFile,
Flag: "overwrite-pid-file",
Default: o.OverwritePIDFile,
Desc: "overwrite PID file if it already exists instead of exiting",
},
{
DestP: &o.SessionLength,
Flag: "session-length",
Expand Down
79 changes: 79 additions & 0 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"crypto/tls"
"errors"
"fmt"
"io/fs"
"net"
nethttp "net/http"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -109,6 +111,11 @@ const (
JaegerTracing = "jaeger"
)

var (
// ErrPIDFileExists indicates that a PID file already exists.
ErrPIDFileExists = errors.New("PID file exists (possible unclean shutdown or another instance already running)")
)

type labeledCloser struct {
label string
closer func(context.Context) error
Expand Down Expand Up @@ -248,6 +255,10 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
}
}

if err := m.writePIDFile(opts.PIDFile, opts.OverwritePIDFile); err != nil {
return fmt.Errorf("error writing PIDFile %q: %w", opts.PIDFile, err)
}

m.reg = prom.NewRegistry(m.log.With(zap.String("service", "prom_registry")))
m.reg.MustRegister(collectors.NewGoCollector())

Expand Down Expand Up @@ -970,6 +981,74 @@ func (m *Launcher) initTracing(opts *InfluxdOpts) {
}
}

// writePIDFile will write the process ID to pidFilename and register a cleanup function to delete it during
// shutdown. If pidFilename is empty, then no PID file is written and no cleanup function is registered.
// If pidFilename already exists and overwrite is false, then pidFilename is not overwritten and a
// ErrPIDFileExists error is returned. If pidFilename already exists and overwrite is true, then pidFilename
// will be overwritten but a warning will be logged.
func (m *Launcher) writePIDFile(pidFilename string, overwrite bool) error {
if pidFilename == "" {
return nil
}

// Create directory to PIDfile if needed.
if err := os.MkdirAll(filepath.Dir(pidFilename), 0777); err != nil {
return fmt.Errorf("mkdir: %w", err)
}

// Write PID to file, but don't clobber an existing PID file.
pidBytes := []byte(strconv.Itoa(os.Getpid()))
pidMode := fs.FileMode(0666)
openFlags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
pidFile, err := os.OpenFile(pidFilename, openFlags|os.O_EXCL, pidMode)
if err != nil {
if !errors.Is(err, fs.ErrExist) {
return fmt.Errorf("open file: %w", err)
}
if !overwrite {
return ErrPIDFileExists
} else {
m.log.Warn("PID file already exists, attempting to overwrite", zap.String("pidFile", pidFilename))
pidFile, err = os.OpenFile(pidFilename, openFlags, pidMode)
if err != nil {
return fmt.Errorf("overwrite file: %w", err)
}
}
}
_, writeErr := pidFile.Write(pidBytes) // Contract says Write must return an error if count < len(pidBytes).
closeErr := pidFile.Close() // always close the file
if writeErr != nil || closeErr != nil {
var errs []error
if writeErr != nil {
errs = append(errs, fmt.Errorf("write file: %w", writeErr))
}
if closeErr != nil {
errs = append(errs, fmt.Errorf("close file: %w", closeErr))
}

// Let's make sure we don't leave a PID file behind on error.
removeErr := os.Remove(pidFilename)
if removeErr != nil {
errs = append(errs, fmt.Errorf("remove file: %w", removeErr))
}

return errors.Join(errs...)
}

// Add a cleanup function.
m.closers = append(m.closers, labeledCloser{
label: "pidfile",
closer: func(context.Context) error {
if err := os.Remove(pidFilename); err != nil {
return fmt.Errorf("removing PID file %q: %w", pidFilename, err)
}
return nil
},
})

return nil
}

// openMetaStores opens the embedded DBs used to store metadata about influxd resources, migrating them to
// the latest schema expected by the server.
// On success, a unique ID is returned to be used as an identifier for the influxd instance in telemetry.
Expand Down
7 changes: 6 additions & 1 deletion cmd/influxd/launcher/launcher_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type TestLauncher struct {
Bucket *influxdb.Bucket
Auth *influxdb.Authorization

Logger *zap.Logger

httpClient *httpc.Client
apiClient *api.APIClient

Expand Down Expand Up @@ -146,7 +148,10 @@ func (tl *TestLauncher) Run(tb zaptest.TestingT, ctx context.Context, setters ..
}

// Set up top-level logger to write into the test-case.
tl.Launcher.log = zaptest.NewLogger(tb, zaptest.Level(opts.LogLevel)).With(zap.String("test_name", tb.Name()))
if tl.Logger == nil {
tl.Logger = zaptest.NewLogger(tb, zaptest.Level(opts.LogLevel)).With(zap.String("test_name", tb.Name()))
}
tl.Launcher.log = tl.Logger
return tl.Launcher.run(ctx, opts)
}

Expand Down
135 changes: 135 additions & 0 deletions cmd/influxd/launcher/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,13 @@ package launcher_test
import (
"context"
"encoding/json"
"fmt"
"io"
"io/fs"
nethttp "net/http"
"os"
"path/filepath"
"strconv"
"testing"
"time"

Expand All @@ -14,6 +19,10 @@ import (
"github.com/influxdata/influxdb/v2/http"
"github.com/influxdata/influxdb/v2/tenant"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
)

// Default context.
Expand Down Expand Up @@ -164,3 +173,129 @@ func TestLauncher_PingHeaders(t *testing.T) {
assert.Equal(t, []string{"OSS"}, resp.Header.Values("X-Influxdb-Build"))
assert.Equal(t, []string{"dev"}, resp.Header.Values("X-Influxdb-Version"))
}

func TestLauncher_PIDFile(t *testing.T) {
pidDir := t.TempDir()
pidFilename := filepath.Join(pidDir, "influxd.pid")

l := launcher.RunAndSetupNewLauncherOrFail(ctx, t, func(o *launcher.InfluxdOpts) {
o.PIDFile = pidFilename
})
defer func() {
l.ShutdownOrFail(t, ctx)
require.NoFileExists(t, pidFilename)
}()

require.FileExists(t, pidFilename)
pidBytes, err := os.ReadFile(pidFilename)
require.NoError(t, err)
require.Equal(t, strconv.Itoa(os.Getpid()), string(pidBytes))
}

func TestLauncher_PIDFile_Locked(t *testing.T) {
pidDir := t.TempDir()
pidFilename := filepath.Join(pidDir, "influxd.pid")
lockContents := []byte("foobar") // something wouldn't appear in normal lock file

// Write PID file to lock out the launcher.
require.NoError(t, os.WriteFile(pidFilename, lockContents, 0666))
require.FileExists(t, pidFilename)
origSt, err := os.Stat(pidFilename)
require.NoError(t, err)

// Make sure we get an error about the PID file from the launcher
l := launcher.NewTestLauncher()
err = l.Run(t, ctx, func(o *launcher.InfluxdOpts) {
o.PIDFile = pidFilename
})
defer func() {
l.ShutdownOrFail(t, ctx)

require.FileExists(t, pidFilename)
contents, err := os.ReadFile(pidFilename)
require.NoError(t, err)
require.Equal(t, lockContents, contents)
curSt, err := os.Stat(pidFilename)
require.NoError(t, err)

// We can't compare origSt and curSt directly because even on mounts
// with "noatime" or "relatime" options, the sys.Atim field can still
// change. We'll just compare the most relevant exposed fields.
require.Equal(t, origSt.ModTime(), curSt.ModTime())
require.Equal(t, origSt.Mode(), curSt.Mode())
}()

require.ErrorIs(t, err, launcher.ErrPIDFileExists)
require.ErrorContains(t, err, fmt.Sprintf("error writing PIDFile %q: PID file exists (possible unclean shutdown or another instance already running)", pidFilename))
}

func TestLauncher_PIDFile_Overwrite(t *testing.T) {
pidDir := t.TempDir()
pidFilename := filepath.Join(pidDir, "influxd.pid")
lockContents := []byte("foobar") // something wouldn't appear in normal lock file

// Write PID file to lock out the launcher (or not in this case).
require.NoError(t, os.WriteFile(pidFilename, lockContents, 0666))
require.FileExists(t, pidFilename)

// Make sure we get an error about the PID file from the launcher.
l := launcher.NewTestLauncher()
loggerCore, ol := observer.New(zap.WarnLevel)
l.Logger = zap.New(loggerCore)
err := l.Run(t, ctx, func(o *launcher.InfluxdOpts) {
o.PIDFile = pidFilename
o.OverwritePIDFile = true
})
defer func() {
l.ShutdownOrFail(t, ctx)

require.NoFileExists(t, pidFilename)
}()
require.NoError(t, err)

expLogs := []observer.LoggedEntry{
{
Entry: zapcore.Entry{Level: zap.WarnLevel, Message: "PID file already exists, attempting to overwrite"},
Context: []zapcore.Field{zap.String("pidFile", pidFilename)},
},
}
require.Equal(t, expLogs, ol.AllUntimed())
require.FileExists(t, pidFilename)
pidBytes, err := os.ReadFile(pidFilename)
require.NoError(t, err)
require.Equal(t, strconv.Itoa(os.Getpid()), string(pidBytes))
}

func TestLauncher_PIDFile_OverwriteFail(t *testing.T) {
if os.Geteuid() == 0 {
t.Skip("test will fail when run as root")
}

pidDir := t.TempDir()
pidFilename := filepath.Join(pidDir, "influxd.pid")
lockContents := []byte("foobar") // something wouldn't appear in normal lock file

// Write PID file to lock out the launcher.
require.NoError(t, os.WriteFile(pidFilename, lockContents, 0666))
require.FileExists(t, pidFilename)
require.NoError(t, os.Chmod(pidFilename, 0000))

// Make sure we get an error about the PID file from the launcher
l := launcher.NewTestLauncher()
err := l.Run(t, ctx, func(o *launcher.InfluxdOpts) {
o.PIDFile = pidFilename
o.OverwritePIDFile = true
})
defer func() {
l.ShutdownOrFail(t, ctx)

require.NoError(t, os.Chmod(pidFilename, 0644))
require.FileExists(t, pidFilename)
pidBytes, err := os.ReadFile(pidFilename)
require.NoError(t, err)
require.Equal(t, lockContents, pidBytes)
}()

require.ErrorContains(t, err, fmt.Sprintf("error writing PIDFile %[1]q: overwrite file: open %[1]s:", pidFilename))
require.ErrorIs(t, err, fs.ErrPermission)
}

0 comments on commit e9ea8cf

Please sign in to comment.