Skip to content

Commit

Permalink
cmd/worklog: wire pgstore in to program
Browse files Browse the repository at this point in the history
  • Loading branch information
kortschak committed Sep 28, 2024
1 parent 2c7c3ea commit bf380e9
Show file tree
Hide file tree
Showing 7 changed files with 484 additions and 87 deletions.
23 changes: 21 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ jobs:
PGPORT: 5432
PGUSER: test_user
PGPASSWORD: password
POSTGRES_DB: postgres

steps:
- name: install Go
Expand All @@ -111,14 +112,32 @@ jobs:
with:
fetch-depth: 1

- name: unit tests postgres
- name: non-Go linux dependencies
run: |
sudo apt-get update
sudo apt-get install -qq libudev-dev
- name: set up postgres users
run: |
psql --host $PGHOST \
--username="postgres" \
--dbname="postgres" \
--command="CREATE USER $PGUSER PASSWORD '$PGPASSWORD'" \
--command="ALTER USER $PGUSER CREATEDB" \
--command="CREATE USER ${PGUSER}_ro PASSWORD '$PGPASSWORD'" \
--command="CREATE USER ${PGUSER}_ro PASSWORD '${PGPASSWORD}_ro'" \
--command="\du"
echo ${PGHOST}:${PGPORT}:*:${PGUSER}:${PGPASSWORD} >> ~/.pgpass
echo ${PGHOST}:${PGPORT}:*:${PGUSER}_ro:${PGPASSWORD}_ro >> ~/.pgpass
chmod 600 ~/.pgpass
- name: unit tests postgres
run: |
go test ./cmd/worklog/pgstore
- name: integration tests postgres
uses: nick-fields/retry@v3
with:
timeout_minutes: 10
max_attempts: 3
command: |
go test -tags postgres -run TestScripts/worklog_load_postgres
8 changes: 7 additions & 1 deletion cmd/worklog/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# `worklog`

`worklog` is a module that records screen activity, screen-saver lock state and AFK status. It takes messages from the `watcher` module and records them in an SQLite database and serves a small dashboard page that shows work activity.
`worklog` is a module that records screen activity, screen-saver lock state and AFK status. It takes messages from the `watcher` module and records them in an SQLite or PostgreSQL database and serves a small dashboard page that shows work activity.

Example configuration fragment (requires a kernel configuration fragment):
```
Expand Down Expand Up @@ -187,3 +187,9 @@ The CEL environment enables the CEL [optional types library](https://pkg.go.dev/
## CEL extensions

The CEL environment provides the [`Lib`](https://pkg.go.dev/github.com/kortschak/dex/internal/celext#Lib) and [`StateLib`](https://pkg.go.dev/github.com/kortschak/dex/internal/celext#StateLib) extensions from the celext package. `StateLib` is only available in `module.*.options.rules.*.src`.

## PostgreSQL store

When using PostgreSQL as a store, the `~/.pgpass` file MAY be used for password look-up for the primary connection to the database and MUST be used for the read-only connection.

The read-only connection is made on start-up. Before connection, the read-only user, which is `${PGUSER}_ro` where `${PGUSER}` is the user for the primary connection, is checked for its ability to read the tables used by the store and for the ability to do any non-SELECT operations. If the user cannot read the tables, a warning is emitted, but the connection is made. If non-SELECT operations are allowed for the user, or the user can read other tables, no connection is made. Since this check is only made at start-up, there is a TOCTOU concern here, but exploiting this would require having user ALTER and GRANT grants at which point you have already lost the game.
140 changes: 93 additions & 47 deletions cmd/worklog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ type daemon struct {

rMu sync.Mutex
lastEvents map[string]*worklog.Event
dMu sync.Mutex // tMu is only used for protecting configuration of db.
db atomicIfaceValue[storage]

lastReport map[rpc.UID]worklog.Report
Expand Down Expand Up @@ -359,7 +360,7 @@ func (d *daemon) Handle(ctx context.Context, req *jsonrpc2.Request) (any, error)
}
}

databaseDir, err := dbDir(m.Body)
scheme, databaseDir, err := dbDir(m.Body)
if err != nil {
d.log.LogAttrs(ctx, slog.LevelError, "configure database", slog.Any("error", err))
return nil, rpc.NewError(rpc.ErrCodeInvalidMessage,
Expand All @@ -371,50 +372,86 @@ func (d *daemon) Handle(ctx context.Context, req *jsonrpc2.Request) (any, error)
},
)
}
if databaseDir != "" {
dir, err := xdg.State(databaseDir)
switch err {
case nil:
case syscall.ENOENT:
var ok bool
dir, ok = xdg.StateHome()
if !ok {
d.log.LogAttrs(ctx, slog.LevelError, "configure database", slog.String("error", "no XDG_STATE_HOME"))
return nil, err
d.dMu.Lock()
defer d.dMu.Unlock()
switch {
default:
d.log.LogAttrs(ctx, slog.LevelError, "configure database", slog.String("error", "unknown scheme"), slog.String("url", m.Body.Options.Database))
return nil, rpc.NewError(rpc.ErrCodeInvalidMessage,
err.Error(),
map[string]any{
"type": rpc.ErrCodeParameters,
"database": m.Body.Options.Database,
},
)
case scheme == "":
// Do nothing.
case scheme == "postgres", scheme == "sqlite":
var (
open opener
addr string
)
switch {
case scheme == "postgres":
addr = m.Body.Options.Database
open = func(ctx context.Context, addr, host string) (storage, error) {
db, err := pgstore.Open(ctx, addr, host)
if _, ok := err.(pgstore.Warning); ok {
d.log.LogAttrs(ctx, slog.LevelWarn, "configure database", slog.Any("error", err))
err = nil
}
return db, err
}
dir = filepath.Join(dir, databaseDir)
err = os.Mkdir(dir, 0o750)
if err != nil {
err := err.(*os.PathError) // See godoc for os.Mkdir for why this is safe.
d.log.LogAttrs(ctx, slog.LevelError, "create database dir", slog.Any("error", err))
return nil, rpc.NewError(rpc.ErrCodeInternal,

case scheme == "sqlite":
dir, err := xdg.State(databaseDir)
switch err {
case nil:
case syscall.ENOENT:
var ok bool
dir, ok = xdg.StateHome()
if !ok {
d.log.LogAttrs(ctx, slog.LevelError, "configure database", slog.String("error", "no XDG_STATE_HOME"))
return nil, err
}
dir = filepath.Join(dir, databaseDir)
err = os.Mkdir(dir, 0o750)
if err != nil {
err := err.(*os.PathError) // See godoc for os.Mkdir for why this is safe.
d.log.LogAttrs(ctx, slog.LevelError, "create database dir", slog.Any("error", err))
return nil, rpc.NewError(rpc.ErrCodeInternal,
err.Error(),
map[string]any{
"type": rpc.ErrCodePath,
"op": err.Op,
"path": err.Path,
"err": fmt.Sprint(err.Err),
},
)
}
default:
d.log.LogAttrs(ctx, slog.LevelError, "configure database", slog.Any("error", err))
return nil, jsonrpc2.NewError(
rpc.ErrCodeInternal,
err.Error(),
map[string]any{
"type": rpc.ErrCodePath,
"op": err.Op,
"path": err.Path,
"err": fmt.Sprint(err.Err),
},
)
}
default:
d.log.LogAttrs(ctx, slog.LevelError, "configure database", slog.Any("error", err))
return nil, jsonrpc2.NewError(
rpc.ErrCodeInternal,
err.Error(),
)

addr = filepath.Join(dir, "db.sqlite")
open = func(ctx context.Context, addr, host string) (storage, error) {
return store.Open(ctx, addr, host)
}
}

path := filepath.Join(dir, "db.sqlite")
if db := d.db.Load(); db == nil || path != db.Name() {
err = d.openDB(ctx, db, path, m.Body.Options.Hostname)
if db := d.db.Load(); !sameDB(db, addr) {
err = d.openDB(ctx, db, open, addr, m.Body.Options.Hostname)
if err != nil {
return nil, rpc.NewError(rpc.ErrCodeInternal,
err.Error(),
map[string]any{
"type": rpc.ErrCodeStoreErr,
"op": "open",
"path": path,
"name": addr,
},
)
}
Expand Down Expand Up @@ -444,34 +481,41 @@ func (d *daemon) Handle(ctx context.Context, req *jsonrpc2.Request) (any, error)
}
}

func dbDir(cfg worklog.Config) (string, error) {
func dbDir(cfg worklog.Config) (scheme, dir string, err error) {
opt := cfg.Options
if opt.Database == "" {
return opt.DatabaseDir, nil
if opt.DatabaseDir != "" {
scheme = "sqlite"
}
return scheme, opt.DatabaseDir, nil
}
u, err := url.Parse(opt.Database)
if err != nil {
return "", err
return "", "", err
}
switch u.Scheme {
case "":
return "", errors.New("missing scheme in database configuration")
return "", "", errors.New("missing scheme in database configuration")
case "sqlite":
if opt.DatabaseDir != "" && u.Opaque != opt.DatabaseDir {
return "", fmt.Errorf("inconsistent database directory configuration: (%s:)%s != %s", u.Scheme, u.Opaque, opt.DatabaseDir)
return "", "", fmt.Errorf("inconsistent database directory configuration: (%s:)%s != %s", u.Scheme, u.Opaque, opt.DatabaseDir)
}
if u.Opaque == "" {
return "", fmt.Errorf("sqlite configuration missing opaque data: %s", opt.Database)
return "", "", fmt.Errorf("sqlite configuration missing opaque data: %s", opt.Database)
}
return u.Opaque, nil
return u.Scheme, u.Opaque, nil
default:
if opt.DatabaseDir != "" {
return "", fmt.Errorf("inconsistent database configuration: both %s database and sqlite directory configured", u.Scheme)
return "", "", fmt.Errorf("inconsistent database configuration: both %s database and sqlite directory configured", u.Scheme)
}
return "", nil
return u.Scheme, "", nil
}
}

func sameDB(db storage, name string) bool {
return db != nil && name == db.Name()
}

func (d *daemon) replaceTimezone(ctx context.Context, dynamic *bool) {
if dynamic == nil {
return
Expand Down Expand Up @@ -579,26 +623,28 @@ func (d *daemon) configureRules(ctx context.Context, rules map[string]worklog.Ru
d.rules.Store(ruleDetails)
}

func (d *daemon) openDB(ctx context.Context, db storage, path, hostname string) error {
type opener = func(ctx context.Context, addr, hostname string) (storage, error)

func (d *daemon) openDB(ctx context.Context, db storage, open opener, addr, hostname string) error {
if db != nil {
d.log.LogAttrs(ctx, slog.LevelInfo, "close database", slog.String("path", db.Name()))
d.log.LogAttrs(ctx, slog.LevelInfo, "close database", slog.String("name", db.Name()))
d.db.Store((storage)(nil))
db.Close(ctx)
}
// store.Open may need to get the hostname, which may
// An opener may need to get the hostname, which may
// wait indefinitely due to network unavailability.
// So make a timeout and allow the fallback to the
// kernel-provided hostname. This fallback is
// implemented by store.Open.
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
db, err := store.Open(ctx, path, hostname)
db, err := open(ctx, addr, hostname)
if err != nil {
d.log.LogAttrs(ctx, slog.LevelError, "open database", slog.Any("error", err))
return err
}
d.db.Store(db)
d.log.LogAttrs(ctx, slog.LevelInfo, "open database", slog.String("path", path))
d.log.LogAttrs(ctx, slog.LevelInfo, "open database", slog.String("name", addr))
return nil
}

Expand Down
Loading

0 comments on commit bf380e9

Please sign in to comment.