Skip to content

Support for multi-insert of transactions and movements #1102

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions token/services/auditdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,17 +179,14 @@ func (d *StoreService) Append(ctx context.Context, req tokenRequest) error {
w.Rollback()
return errors.WithMessagef(err, "append token request for txid [%s] failed", record.Anchor)
}
for _, mv := range mov {
if err := w.AddMovement(ctx, &mv); err != nil {
w.Rollback()
return errors.WithMessagef(err, "append sent movements for txid [%s] failed", record.Anchor)
}
if err := w.AddMovement(ctx, mov...); err != nil {
w.Rollback()
return errors.WithMessagef(err, "append sent movements for txid [%s] failed", record.Anchor)
}
for _, tx := range txs {
if err := w.AddTransaction(ctx, &tx); err != nil {
w.Rollback()
return errors.WithMessagef(err, "append transactions for txid [%s] failed", record.Anchor)
}

if err := w.AddTransaction(ctx, txs...); err != nil {
w.Rollback()
return errors.WithMessagef(err, "append transactions for txid [%s] failed", record.Anchor)
}
if err := w.Commit(); err != nil {
return errors.WithMessagef(err, "committing tx for txid [%s] failed", record.Anchor)
Expand Down
44 changes: 22 additions & 22 deletions token/services/db/dbtest/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TFailsIfRequestDoesNotExist(t *testing.T, db driver.TokenTransactionStore)
Status: driver.Pending,
}
w, _ := db.BeginAtomicWrite()
err := w.AddTransaction(ctx, &tx)
err := w.AddTransaction(ctx, tx)
assert.Error(t, err)
assert.True(t, errors.Is(err, driver.ErrTokenRequestDoesNotExist))
w.Rollback()
Expand All @@ -88,7 +88,7 @@ func TFailsIfRequestDoesNotExist(t *testing.T, db driver.TokenTransactionStore)
w.Rollback()

w, _ = db.BeginAtomicWrite()
err = w.AddMovement(ctx, &mv)
err = w.AddMovement(ctx, mv)
assert.Error(t, err)
assert.True(t, errors.Is(err, driver.ErrTokenRequestDoesNotExist))
w.Rollback()
Expand Down Expand Up @@ -117,9 +117,9 @@ func TStatus(t *testing.T, db driver.TokenTransactionStore) {
w, err := db.BeginAtomicWrite()
assert.NoError(t, err, "begin")
assert.NoError(t, w.AddTokenRequest(ctx, "tx1", []byte("request"), map[string][]byte{}, nil, driver2.PPHash("tr")), "add token request")
assert.NoError(t, w.AddTransaction(ctx, &tx))
assert.NoError(t, w.AddTransaction(ctx, tx))
assert.NoError(t, w.AddValidationRecord(ctx, "tx1", nil), "add validation record")
assert.NoError(t, w.AddMovement(ctx, &mv))
assert.NoError(t, w.AddMovement(ctx, mv))
assert.NoError(t, w.Commit())

s, mess, err := db.GetStatus(ctx, "tx1")
Expand Down Expand Up @@ -157,7 +157,7 @@ func TStoresTimestamp(t *testing.T, db driver.TokenTransactionStore) {
w, err := db.BeginAtomicWrite()
assert.NoError(t, err)
assert.NoError(t, w.AddTokenRequest(ctx, "tx1", []byte(""), map[string][]byte{}, nil, driver2.PPHash("tr")))
assert.NoError(t, w.AddTransaction(ctx, &driver.TransactionRecord{
assert.NoError(t, w.AddTransaction(ctx, driver.TransactionRecord{
TxID: "tx1",
ActionType: driver.Transfer,
SenderEID: "bob",
Expand Down Expand Up @@ -190,19 +190,19 @@ func TMovements(t *testing.T, db driver.TokenTransactionStore) {
assert.NoError(t, w.AddTokenRequest(ctx, "0", []byte{}, map[string][]byte{}, nil, driver2.PPHash("tr")))
assert.NoError(t, w.AddTokenRequest(ctx, "1", []byte{}, map[string][]byte{}, nil, driver2.PPHash("tr")))
assert.NoError(t, w.AddTokenRequest(ctx, "2", []byte{}, map[string][]byte{}, nil, driver2.PPHash("tr")))
assert.NoError(t, w.AddMovement(ctx, &driver.MovementRecord{
assert.NoError(t, w.AddMovement(ctx, driver.MovementRecord{
TxID: "0",
EnrollmentID: "alice",
TokenType: "magic",
Amount: big.NewInt(10),
}))
assert.NoError(t, w.AddMovement(ctx, &driver.MovementRecord{
assert.NoError(t, w.AddMovement(ctx, driver.MovementRecord{
TxID: "1",
EnrollmentID: "alice",
TokenType: "magic",
Amount: big.NewInt(20),
}))
assert.NoError(t, w.AddMovement(ctx, &driver.MovementRecord{
assert.NoError(t, w.AddMovement(ctx, driver.MovementRecord{
TxID: "2",
EnrollmentID: "alice",
TokenType: "magic",
Expand Down Expand Up @@ -251,14 +251,14 @@ func TMovements(t *testing.T, db driver.TokenTransactionStore) {

func TTransaction(t *testing.T, db driver.TokenTransactionStore) {
ctx := context.Background()
var txs []*driver.TransactionRecord
var txs []driver.TransactionRecord

t0 := time.Now()
lastYear := t0.AddDate(-1, 0, 0)

w, err := db.BeginAtomicWrite()
assert.NoError(t, err)
tr1 := &driver.TransactionRecord{
tr1 := driver.TransactionRecord{
TxID: fmt.Sprintf("tx%d", 99),
ActionType: driver.Transfer,
SenderEID: "bob",
Expand All @@ -276,7 +276,7 @@ func TTransaction(t *testing.T, db driver.TokenTransactionStore) {
for i := 0; i < 20; i++ {
now := time.Now()
ctx := context.Background()
tr := &driver.TransactionRecord{
tr := driver.TransactionRecord{
TxID: fmt.Sprintf("tx%d", i),
ActionType: driver.Issue,
SenderEID: "",
Expand Down Expand Up @@ -308,7 +308,7 @@ func TTransaction(t *testing.T, db driver.TokenTransactionStore) {
for _, exp := range txs {
act, err := it.Items.Next()
assert.NoError(t, err)
assertTxEqual(t, exp, act)
assertTxEqual(t, &exp, act)
}
it.Items.Close()

Expand All @@ -321,7 +321,7 @@ func TTransaction(t *testing.T, db driver.TokenTransactionStore) {
// find 1 transaction from last year
tr, err := it.Items.Next()
assert.NoError(t, err)
assertTxEqual(t, tr1, tr)
assertTxEqual(t, &tr1, tr)

// find no other transactions
tr, err = it.Items.Next()
Expand Down Expand Up @@ -350,7 +350,7 @@ func TTransaction(t *testing.T, db driver.TokenTransactionStore) {
// exclude to self
w, err = db.BeginAtomicWrite()
assert.NoError(t, err)
tr1 = &driver.TransactionRecord{
tr1 = driver.TransactionRecord{
TxID: "1234",
ActionType: driver.Transfer,
SenderEID: "alice",
Expand Down Expand Up @@ -505,7 +505,7 @@ func TAllowsSameTxID(t *testing.T, db driver.TokenTransactionStore) {
ctx := context.Background()

// bob sends 10 to alice
tr1 := &driver.TransactionRecord{
tr1 := driver.TransactionRecord{
TxID: "1",
ActionType: driver.Transfer,
SenderEID: "bob",
Expand All @@ -516,7 +516,7 @@ func TAllowsSameTxID(t *testing.T, db driver.TokenTransactionStore) {
Timestamp: time.Now(),
}
// 1 is sent back to bobs wallet as change
tr2 := &driver.TransactionRecord{
tr2 := driver.TransactionRecord{
TxID: "1",
ActionType: driver.Transfer,
SenderEID: "bob",
Expand All @@ -535,8 +535,8 @@ func TAllowsSameTxID(t *testing.T, db driver.TokenTransactionStore) {

txs := getTransactions(t, db, driver.QueryTransactionsParams{})
assert.Len(t, txs, 2)
assertTxEqual(t, tr1, txs[0])
assertTxEqual(t, tr2, txs[1])
assertTxEqual(t, &tr1, txs[0])
assertTxEqual(t, &tr2, txs[1])
}

func TRollback(t *testing.T, db driver.TokenTransactionStore) {
Expand All @@ -545,14 +545,14 @@ func TRollback(t *testing.T, db driver.TokenTransactionStore) {
assert.NoError(t, err)
assert.NoError(t, w.AddTokenRequest(ctx, "1", []byte("arbitrary bytes"), map[string][]byte{}, nil, driver2.PPHash("tr")))

mr1 := &driver.MovementRecord{
mr1 := driver.MovementRecord{
TxID: "1",
EnrollmentID: "bob",
TokenType: "magic",
Amount: big.NewInt(10),
Status: driver.Pending,
}
tr1 := &driver.TransactionRecord{
tr1 := driver.TransactionRecord{
TxID: "1",
ActionType: driver.Transfer,
SenderEID: "bob",
Expand Down Expand Up @@ -791,7 +791,7 @@ func TTransactionQueries(t *testing.T, db driver.TokenTransactionStore) {
if r.TxID != previous {
assert.NoError(t, w.AddTokenRequest(ctx, r.TxID, []byte{}, map[string][]byte{}, nil, driver2.PPHash("tr")))
}
assert.NoError(t, w.AddTransaction(ctx, &r))
assert.NoError(t, w.AddTransaction(ctx, r))
previous = r.TxID
}
assert.NoError(t, w.Commit())
Expand Down Expand Up @@ -939,7 +939,7 @@ func createTestTransaction(t *testing.T, db driver.TokenTransactionStore, txID s
if err := w.AddTokenRequest(context.Background(), txID, []byte{}, map[string][]byte{}, nil, driver2.PPHash("tr")); err != nil {
t.Fatalf("error creating token request while trying to test something else: %s", err)
}
tr1 := &driver.TransactionRecord{
tr1 := driver.TransactionRecord{
TxID: txID,
ActionType: driver.Transfer,
SenderEID: "bob",
Expand Down
5 changes: 5 additions & 0 deletions token/services/db/driver/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package driver

import (
"fmt"
"math/big"
"strconv"
"strings"
Expand Down Expand Up @@ -95,6 +96,10 @@ type MovementRecord struct {
Status TxStatus
}

func (r *MovementRecord) String() string {
return fmt.Sprintf("[%s:%s:%s:%d:%d]", r.TxID, r.EnrollmentID, r.TokenType, r.Amount.Int64(), r.Status)
}

// TransactionRecord is a more finer-grained version of a movement record.
// Given a Token Transaction, for each token action in the Token Request,
// a transaction record is created for each unique enrollment ID found in the outputs.
Expand Down
4 changes: 2 additions & 2 deletions token/services/db/driver/ttx.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ type AtomicWrite interface {
// AddMovement adds a movement record to the database transaction.
// Each token transaction can be seen as a list of movements.
// This operation _requires_ a TokenRequest with the same tx_id to exist
AddMovement(ctx context.Context, record *MovementRecord) error
AddMovement(ctx context.Context, records ...MovementRecord) error

// AddTransaction adds a transaction record to the database transaction.
// This operation _requires_ a TokenRequest with the same tx_id to exist
AddTransaction(ctx context.Context, record *TransactionRecord) error
AddTransaction(ctx context.Context, records ...TransactionRecord) error

// AddValidationRecord adds a new validation records for the given params
// This operation _requires_ a TokenRequest with the same tx_id to exist
Expand Down
54 changes: 30 additions & 24 deletions token/services/db/sql/common/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,27 +434,29 @@ func (w *AtomicWrite) Rollback() {
w.txn = nil
}

func (w *AtomicWrite) AddTransaction(ctx context.Context, r *driver.TransactionRecord) error {
logger.Debugf("adding transaction record [%s:%d,%s:%s:%s:%s]", r.TxID, r.ActionType, r.TokenType, r.SenderEID, r.RecipientEID, r.Amount)
func (w *AtomicWrite) AddTransaction(ctx context.Context, rs ...driver.TransactionRecord) error {
if w.txn == nil {
return errors.New("no db transaction in progress")
}
if !r.Amount.IsInt64() {
return errors.New("the database driver does not support larger values than int64")
}
amount := r.Amount.Int64()
actionType := int(r.ActionType)
id, err := uuid.GenerateUUID()
if err != nil {
return errors.Wrapf(err, "error generating uuid")
rows := make([]common3.Tuple, len(rs))
for i, r := range rs {
logger.Debugf("adding transaction record [%s:%d,%s:%s:%s:%s]", r.TxID, r.ActionType, r.TokenType, r.SenderEID, r.RecipientEID, r.Amount)
if !r.Amount.IsInt64() {
return errors.New("the database driver does not support larger values than int64")
}
id, err := uuid.GenerateUUID()
if err != nil {
return errors.Wrapf(err, "error generating uuid")
}
rows[i] = common3.Tuple{id, r.TxID, int(r.ActionType), r.SenderEID, r.RecipientEID, r.TokenType, r.Amount.Int64(), r.Timestamp.UTC()}
}

query, args := q.InsertInto(w.table.Transactions).
Fields("id", "tx_id", "action_type", "sender_eid", "recipient_eid", "token_type", "amount", "stored_at").
Row(id, r.TxID, actionType, r.SenderEID, r.RecipientEID, r.TokenType, amount, r.Timestamp.UTC()).
Rows(rows).
Format()
logger.Debug(query, args)
_, err = w.txn.ExecContext(ctx, query, args...)
_, err := w.txn.ExecContext(ctx, query, args...)

return ttxDBError(err)
}
Expand Down Expand Up @@ -489,28 +491,32 @@ func (w *AtomicWrite) AddTokenRequest(ctx context.Context, txID string, tr []byt
return ttxDBError(err)
}

func (w *AtomicWrite) AddMovement(ctx context.Context, r *driver.MovementRecord) error {
logger.Debugf("adding movement record [%s:%s:%s:%d:%s]", r.TxID, r.EnrollmentID, r.TokenType, r.Amount.Int64(), r.Status)
func (w *AtomicWrite) AddMovement(ctx context.Context, rs ...driver.MovementRecord) error {
if w.txn == nil {
return errors.New("no db transaction in progress")
}
if !r.Amount.IsInt64() {
return errors.New("the database driver does not support larger values than int64")
}
amount := r.Amount.Int64()

id, err := uuid.GenerateUUID()
if err != nil {
return errors.Wrapf(err, "error generating uuid")
}
now := time.Now().UTC()
rows := make([]common3.Tuple, len(rs))
for i, r := range rs {
logger.Debugf("adding movement record [%s]", r)

if !r.Amount.IsInt64() {
return errors.New("the database driver does not support larger values than int64")
}
id, err := uuid.GenerateUUID()
if err != nil {
return errors.Wrapf(err, "error generating uuid")
}
rows[i] = common3.Tuple{id, r.TxID, r.EnrollmentID, r.TokenType, r.Amount.Int64(), now}
}

query, args := q.InsertInto(w.table.Movements).
Fields("id", "tx_id", "enrollment_id", "token_type", "amount", "stored_at").
Row(id, r.TxID, r.EnrollmentID, r.TokenType, amount, now).
Rows(rows).
Format()
logger.Debug(query, args)
_, err = w.txn.ExecContext(ctx, query, args...)
_, err := w.txn.ExecContext(ctx, query, args...)

return ttxDBError(err)
}
Expand Down
2 changes: 1 addition & 1 deletion token/services/ttxdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func (d *StoreService) AppendTransactionRecord(ctx context.Context, req *token.R
return errors.WithMessagef(err, "append token request for txid [%s] failed", record.Anchor)
}
for _, tx := range txs {
if err := w.AddTransaction(ctx, &tx); err != nil {
if err := w.AddTransaction(ctx, tx); err != nil {
w.Rollback()
return errors.WithMessagef(err, "append transactions for txid [%s] failed", record.Anchor)
}
Expand Down
Loading