diff --git a/.golangci.yml b/.golangci.yml index 6e69c00..d5163f1 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -29,7 +29,7 @@ linters: - gocritic - gocyclo - godot - - goerr113 + - err113 - gofmt - gofumpt - goimports @@ -129,7 +129,7 @@ issues: - goconst - gomnd - containedctx - - goerr113 + - err113 - errcheck - nolintlint - forcetypeassert diff --git a/drivers/pgxv4/transaction.go b/drivers/pgxv4/transaction.go index 329646f..5475740 100644 --- a/drivers/pgxv4/transaction.go +++ b/drivers/pgxv4/transaction.go @@ -2,6 +2,7 @@ package pgxv4 import ( "context" + "sync" "github.com/jackc/pgx/v4" @@ -11,10 +12,19 @@ import ( // Transaction is trm.Transaction for pgx.Tx. type Transaction struct { + mu sync.Mutex tx pgx.Tx isClosed *drivers.IsClosed } +func newDefaultTransaction(tx pgx.Tx) *Transaction { + return &Transaction{ + mu: sync.Mutex{}, + tx: tx, + isClosed: drivers.NewIsClosed(), + } +} + // NewTransaction creates trm.Transaction for pgx.Tx. func NewTransaction( ctx context.Context, @@ -26,10 +36,7 @@ func NewTransaction( return ctx, nil, err } - tr := &Transaction{ - tx: tx, - isClosed: drivers.NewIsClosed(), - } + tr := newDefaultTransaction(tx) go tr.awaitDone(ctx) @@ -43,7 +50,7 @@ func (t *Transaction) awaitDone(ctx context.Context) { select { case <-ctx.Done(): - t.isClosed.Close() + _ = t.Rollback(ctx) case <-t.isClosed.Closed(): } } @@ -60,16 +67,15 @@ func (t *Transaction) Begin(ctx context.Context, _ trm.Settings) (context.Contex return ctx, nil, err } - tr := &Transaction{ - tx: tx, - isClosed: drivers.NewIsClosed(), - } + tr := newDefaultTransaction(tx) return ctx, tr, nil } // Commit the trm.Transaction. func (t *Transaction) Commit(ctx context.Context) error { + t.mu.Lock() + defer t.mu.Unlock() defer t.isClosed.Close() return t.tx.Commit(ctx) @@ -77,6 +83,8 @@ func (t *Transaction) Commit(ctx context.Context) error { // Rollback the trm.Transaction. func (t *Transaction) Rollback(ctx context.Context) error { + t.mu.Lock() + defer t.mu.Unlock() defer t.isClosed.Close() return t.tx.Rollback(ctx) diff --git a/drivers/pgxv4/transaction_integration_test.go b/drivers/pgxv4/transaction_integration_test.go index 77d988f..bbc2cbf 100644 --- a/drivers/pgxv4/transaction_integration_test.go +++ b/drivers/pgxv4/transaction_integration_test.go @@ -6,7 +6,9 @@ package pgxv4_test import ( "context" "fmt" + "sync/atomic" "testing" + "time" "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" @@ -33,8 +35,6 @@ func db(ctx context.Context) (*pgxpool.Pool, error) { } func TestTransaction_WithRealDB(t *testing.T) { - t.Parallel() - ctx := context.Background() pool, err := db(ctx) @@ -52,3 +52,54 @@ func TestTransaction_WithRealDB(t *testing.T) { require.ErrorIs(t, tr.Commit(ctx), pgx.ErrTxClosed) require.ErrorIs(t, tr.Rollback(ctx), pgx.ErrTxClosed) } + +// transaction should release all resources if context is cancelled +// otherwise pool.Close() is blocked forever +func TestTransaction_WithRealDB_RollbackOnContextCancel(t *testing.T) { + ctx := context.Background() + + pool, err := db(ctx) + require.NoError(t, err) + + defer func() { + waitPoolIsClosed(t, pool) + }() + + f := pgxv4.NewDefaultFactory(pool) + + ctx, cancel := context.WithCancel(ctx) + + _, tr, err := f(ctx, settings.Must()) + require.NoError(t, err) + + require.True(t, tr.IsActive()) + + cancel() +} + +func waitPoolIsClosed(t *testing.T, pool *pgxpool.Pool) { + const checkTick = 50 * time.Millisecond + const waitDurationDeadline = 30 * time.Second + + var poolClosed atomic.Bool + poolClosed.Store(false) + + go func() { + pool.Close() + poolClosed.Store(true) + }() + + require.Eventually( + t, + func() bool { + return poolClosed.Load() + }, + waitDurationDeadline, + checkTick) + + // https://github.com/jackc/pgx/issues/1641 + // pool triggerHealthCheck leaves stranded goroutines for 500ms + // otherwise goleak error is triggered + const waitPoolHealthCheck = 500 * time.Millisecond + time.Sleep(waitPoolHealthCheck) +} diff --git a/drivers/pgxv5/transaction.go b/drivers/pgxv5/transaction.go index 3d0aa9a..027b399 100644 --- a/drivers/pgxv5/transaction.go +++ b/drivers/pgxv5/transaction.go @@ -2,6 +2,7 @@ package pgxv5 import ( "context" + "sync" "github.com/jackc/pgx/v5" @@ -11,10 +12,19 @@ import ( // Transaction is trm.Transaction for pgx.Tx. type Transaction struct { + mu sync.Mutex tx pgx.Tx isClosed *drivers.IsClosed } +func newDefaultTransaction(tx pgx.Tx) *Transaction { + return &Transaction{ + mu: sync.Mutex{}, + tx: tx, + isClosed: drivers.NewIsClosed(), + } +} + // NewTransaction creates trm.Transaction for pgx.Tx. func NewTransaction( ctx context.Context, @@ -26,10 +36,7 @@ func NewTransaction( return ctx, nil, err } - tr := &Transaction{ - tx: tx, - isClosed: drivers.NewIsClosed(), - } + tr := newDefaultTransaction(tx) go tr.awaitDone(ctx) @@ -43,7 +50,7 @@ func (t *Transaction) awaitDone(ctx context.Context) { select { case <-ctx.Done(): - t.isClosed.Close() + _ = t.Rollback(ctx) case <-t.isClosed.Closed(): } } @@ -60,16 +67,15 @@ func (t *Transaction) Begin(ctx context.Context, _ trm.Settings) (context.Contex return ctx, nil, err } - tr := &Transaction{ - tx: tx, - isClosed: drivers.NewIsClosed(), - } + tr := newDefaultTransaction(tx) return ctx, tr, nil } // Commit the trm.Transaction. func (t *Transaction) Commit(ctx context.Context) error { + t.mu.Lock() + defer t.mu.Unlock() defer t.isClosed.Close() return t.tx.Commit(ctx) @@ -77,6 +83,8 @@ func (t *Transaction) Commit(ctx context.Context) error { // Rollback the trm.Transaction. func (t *Transaction) Rollback(ctx context.Context) error { + t.mu.Lock() + defer t.mu.Unlock() defer t.isClosed.Close() return t.tx.Rollback(ctx) diff --git a/drivers/pgxv5/transaction_integration_test.go b/drivers/pgxv5/transaction_integration_test.go index 060ece5..272ea5e 100644 --- a/drivers/pgxv5/transaction_integration_test.go +++ b/drivers/pgxv5/transaction_integration_test.go @@ -6,7 +6,9 @@ package pgxv5_test import ( "context" "fmt" + "sync/atomic" "testing" + "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" @@ -33,8 +35,6 @@ func db(ctx context.Context) (*pgxpool.Pool, error) { } func TestTransaction_WithRealDB(t *testing.T) { - t.Parallel() - ctx := context.Background() pool, err := db(ctx) @@ -52,3 +52,54 @@ func TestTransaction_WithRealDB(t *testing.T) { require.ErrorIs(t, tr.Commit(ctx), pgx.ErrTxClosed) require.ErrorIs(t, tr.Rollback(ctx), pgx.ErrTxClosed) } + +// transaction should release all resources if context is cancelled +// otherwise pool.Close() is blocked forever +func TestTransaction_WithRealDB_RollbackOnContextCancel(t *testing.T) { + ctx := context.Background() + + pool, err := db(ctx) + require.NoError(t, err) + + defer func() { + waitPoolIsClosed(t, pool) + }() + + f := pgxv5.NewDefaultFactory(pool) + + ctx, cancel := context.WithCancel(ctx) + + _, tr, err := f(ctx, settings.Must()) + require.NoError(t, err) + + require.True(t, tr.IsActive()) + + cancel() +} + +func waitPoolIsClosed(t *testing.T, pool *pgxpool.Pool) { + const checkTick = 50 * time.Millisecond + const waitDurationDeadline = 30 * time.Second + + var poolClosed atomic.Bool + poolClosed.Store(false) + + go func() { + pool.Close() + poolClosed.Store(true) + }() + + require.Eventually( + t, + func() bool { + return poolClosed.Load() + }, + waitDurationDeadline, + checkTick) + + // https://github.com/jackc/pgx/issues/1641 + // pool triggerHealthCheck leaves stranded goroutines for 500ms + // otherwise goleak error is triggered + const waitPoolHealthCheck = 500 * time.Millisecond + time.Sleep(waitPoolHealthCheck) +}