-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtx.go
48 lines (41 loc) · 1008 Bytes
/
tx.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package database
import (
"context"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
// TxManager defines a method to execute a transaction from Begin until Commit or Rollback.
type TxManager interface {
Begin(context.Context) (pgx.Tx, error)
Execute(context.Context, func(pgx.Tx) error) error
}
func NewTxManager(db Service) TxManager {
return &txManager{db.Pool()}
}
type txManager struct {
*pgxpool.Pool
}
func (tm *txManager) Begin(ctx context.Context) (pgx.Tx, error) {
return tm.Pool.Begin(ctx)
}
func (tm *txManager) Execute(ctx context.Context, fn func(tx pgx.Tx) error) error {
tx, err := tm.Begin(ctx)
if err != nil {
return err
}
defer func() {
if p := recover(); p != nil {
err = tx.Rollback(ctx)
// re-throw panic after rollbacks
panic(p)
} else if err != nil {
// rollback if error happen
err = tx.Rollback(ctx)
} else {
// if Commit returns error update err with commit err
err = tx.Commit(ctx)
}
}()
err = fn(tx)
return err
}