Skip to content

Commit

Permalink
added pgx rollback if context cancelled
Browse files Browse the repository at this point in the history
  • Loading branch information
diugaev committed Jul 28, 2024
1 parent fc0b7ba commit c4e96cb
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 20 deletions.
4 changes: 2 additions & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ linters:
- gocritic
- gocyclo
- godot
- goerr113
- err113
- gofmt
- gofumpt
- goimports
Expand Down Expand Up @@ -129,7 +129,7 @@ issues:
- goconst
- gomnd
- containedctx
- goerr113
- err113
- errcheck
- nolintlint
- forcetypeassert
Expand Down
26 changes: 17 additions & 9 deletions drivers/pgxv4/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pgxv4

import (
"context"
"sync"

"github.com/jackc/pgx/v4"

Expand All @@ -11,10 +12,19 @@ import (

// Transaction is trm.Transaction for pgx.Tx.
type Transaction struct {
mu sync.Mutex
tx pgx.Tx
isClosed *drivers.IsClosed
}

func newDefaultTransaction(tx pgx.Tx) *Transaction {
return &Transaction{
mu: sync.Mutex{},
tx: tx,
isClosed: drivers.NewIsClosed(),
}
}

// NewTransaction creates trm.Transaction for pgx.Tx.
func NewTransaction(
ctx context.Context,
Expand All @@ -26,10 +36,7 @@ func NewTransaction(
return ctx, nil, err
}

tr := &Transaction{
tx: tx,
isClosed: drivers.NewIsClosed(),
}
tr := newDefaultTransaction(tx)

go tr.awaitDone(ctx)

Expand All @@ -43,7 +50,7 @@ func (t *Transaction) awaitDone(ctx context.Context) {

select {
case <-ctx.Done():
t.isClosed.Close()
_ = t.Rollback(ctx)
case <-t.isClosed.Closed():
}
}
Expand All @@ -60,23 +67,24 @@ func (t *Transaction) Begin(ctx context.Context, _ trm.Settings) (context.Contex
return ctx, nil, err
}

tr := &Transaction{
tx: tx,
isClosed: drivers.NewIsClosed(),
}
tr := newDefaultTransaction(tx)

return ctx, tr, nil
}

// Commit the trm.Transaction.
func (t *Transaction) Commit(ctx context.Context) error {
t.mu.Lock()
defer t.mu.Unlock()
defer t.isClosed.Close()

return t.tx.Commit(ctx)
}

// Rollback the trm.Transaction.
func (t *Transaction) Rollback(ctx context.Context) error {
t.mu.Lock()
defer t.mu.Unlock()
defer t.isClosed.Close()

return t.tx.Rollback(ctx)
Expand Down
55 changes: 55 additions & 0 deletions drivers/pgxv4/transaction_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ package pgxv4_test
import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"

"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
Expand Down Expand Up @@ -52,3 +54,56 @@ func TestTransaction_WithRealDB(t *testing.T) {
require.ErrorIs(t, tr.Commit(ctx), pgx.ErrTxClosed)
require.ErrorIs(t, tr.Rollback(ctx), pgx.ErrTxClosed)
}

// transaction should release all resources if context is cancelled
// otherwise pool.Close() is blocked forever
func TestTransaction_WithRealDB_RollbackOnContextCancel(t *testing.T) {
t.Parallel()

ctx := context.Background()

pool, err := db(ctx)
require.NoError(t, err)

defer func() {
waitPoolIsClosed(t, pool)
}()

f := pgxv4.NewDefaultFactory(pool)

ctx, cancel := context.WithCancel(ctx)

_, tr, err := f(ctx, settings.Must())
require.NoError(t, err)

require.True(t, tr.IsActive())

cancel()
}

func waitPoolIsClosed(t *testing.T, pool *pgxpool.Pool) {
const checkTick = 50 * time.Millisecond
const waitDurationDeadline = 30 * time.Second

var poolClosed atomic.Bool
poolClosed.Store(false)

go func() {
pool.Close()
poolClosed.Store(true)
}()

require.Eventually(
t,
func() bool {
return poolClosed.Load()
},
waitDurationDeadline,
checkTick)

// https://github.com/jackc/pgx/issues/1641
// pool triggerHealthCheck leaves stranded goroutines for 500ms
// otherwise goleak error is triggered
const waitPoolHealthCheck = 500 * time.Millisecond
time.Sleep(waitPoolHealthCheck)
}
26 changes: 17 additions & 9 deletions drivers/pgxv5/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pgxv5

import (
"context"
"sync"

"github.com/jackc/pgx/v5"

Expand All @@ -11,10 +12,19 @@ import (

// Transaction is trm.Transaction for pgx.Tx.
type Transaction struct {
mu sync.Mutex
tx pgx.Tx
isClosed *drivers.IsClosed
}

func newDefaultTransaction(tx pgx.Tx) *Transaction {
return &Transaction{
mu: sync.Mutex{},
tx: tx,
isClosed: drivers.NewIsClosed(),
}
}

// NewTransaction creates trm.Transaction for pgx.Tx.
func NewTransaction(
ctx context.Context,
Expand All @@ -26,10 +36,7 @@ func NewTransaction(
return ctx, nil, err
}

tr := &Transaction{
tx: tx,
isClosed: drivers.NewIsClosed(),
}
tr := newDefaultTransaction(tx)

go tr.awaitDone(ctx)

Expand All @@ -43,7 +50,7 @@ func (t *Transaction) awaitDone(ctx context.Context) {

select {
case <-ctx.Done():
t.isClosed.Close()
_ = t.Rollback(ctx)
case <-t.isClosed.Closed():
}
}
Expand All @@ -60,23 +67,24 @@ func (t *Transaction) Begin(ctx context.Context, _ trm.Settings) (context.Contex
return ctx, nil, err
}

tr := &Transaction{
tx: tx,
isClosed: drivers.NewIsClosed(),
}
tr := newDefaultTransaction(tx)

return ctx, tr, nil
}

// Commit the trm.Transaction.
func (t *Transaction) Commit(ctx context.Context) error {
t.mu.Lock()
defer t.mu.Unlock()
defer t.isClosed.Close()

return t.tx.Commit(ctx)
}

// Rollback the trm.Transaction.
func (t *Transaction) Rollback(ctx context.Context) error {
t.mu.Lock()
defer t.mu.Unlock()
defer t.isClosed.Close()

return t.tx.Rollback(ctx)
Expand Down
55 changes: 55 additions & 0 deletions drivers/pgxv5/transaction_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ package pgxv5_test
import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
Expand Down Expand Up @@ -52,3 +54,56 @@ func TestTransaction_WithRealDB(t *testing.T) {
require.ErrorIs(t, tr.Commit(ctx), pgx.ErrTxClosed)
require.ErrorIs(t, tr.Rollback(ctx), pgx.ErrTxClosed)
}

// transaction should release all resources if context is cancelled
// otherwise pool.Close() is blocked forever
func TestTransaction_WithRealDB_RollbackOnContextCancel(t *testing.T) {
t.Parallel()

ctx := context.Background()

pool, err := db(ctx)
require.NoError(t, err)

defer func() {
waitPoolIsClosed(t, pool)
}()

f := pgxv5.NewDefaultFactory(pool)

ctx, cancel := context.WithCancel(ctx)

_, tr, err := f(ctx, settings.Must())
require.NoError(t, err)

require.True(t, tr.IsActive())

cancel()
}

func waitPoolIsClosed(t *testing.T, pool *pgxpool.Pool) {
const checkTick = 50 * time.Millisecond
const waitDurationDeadline = 30 * time.Second

var poolClosed atomic.Bool
poolClosed.Store(false)

go func() {
pool.Close()
poolClosed.Store(true)
}()

require.Eventually(
t,
func() bool {
return poolClosed.Load()
},
waitDurationDeadline,
checkTick)

// https://github.com/jackc/pgx/issues/1641
// pool triggerHealthCheck leaves stranded goroutines for 500ms
// otherwise goleak error is triggered
const waitPoolHealthCheck = 500 * time.Millisecond
time.Sleep(waitPoolHealthCheck)
}

0 comments on commit c4e96cb

Please sign in to comment.