Skip to content

[sql-42] accounts: Update SQL migration to iterate over buckets + ensure proper closure of SQL test stores #1084

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 86 additions & 5 deletions accounts/sql_migration.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package accounts

import (
"bytes"
"context"
"database/sql"
"errors"
Expand All @@ -11,6 +12,7 @@ import (

"github.com/davecgh/go-spew/spew"
"github.com/lightninglabs/lightning-terminal/db/sqlc"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/pmezard/go-difflib/difflib"
)

Expand All @@ -24,7 +26,7 @@ var (
// MigrateAccountStoreToSQL runs the migration of all accounts and indices from
// the KV database to the SQL database. The migration is done in a single
// transaction to ensure that all accounts are migrated or none at all.
func MigrateAccountStoreToSQL(ctx context.Context, kvStore *BoltStore,
func MigrateAccountStoreToSQL(ctx context.Context, kvStore kvdb.Backend,
tx SQLQueries) error {

log.Infof("Starting migration of the KV accounts store to SQL")
Expand All @@ -47,12 +49,12 @@ func MigrateAccountStoreToSQL(ctx context.Context, kvStore *BoltStore,
// migrateAccountsToSQL runs the migration of all accounts from the KV database
// to the SQL database. The migration is done in a single transaction to ensure
// that all accounts are migrated or none at all.
func migrateAccountsToSQL(ctx context.Context, kvStore *BoltStore,
func migrateAccountsToSQL(ctx context.Context, kvStore kvdb.Backend,
tx SQLQueries) error {

log.Infof("Starting migration of accounts from KV to SQL")

kvAccounts, err := kvStore.Accounts(ctx)
kvAccounts, err := getBBoltAccounts(kvStore)
if err != nil {
return err
}
Expand Down Expand Up @@ -104,6 +106,51 @@ func migrateAccountsToSQL(ctx context.Context, kvStore *BoltStore,
return nil
}

// getBBoltAccounts is a helper function that fetches all accounts from the
// Bbolt store, by iterating directly over the buckets, without needing to
// use any public functions of the BoltStore struct.
func getBBoltAccounts(db kvdb.Backend) ([]*OffChainBalanceAccount, error) {
var accounts []*OffChainBalanceAccount
err := db.View(func(tx kvdb.RTx) error {
// This function will be called in the ForEach and receive
// the key and value of each account in the DB. The key, which
// is also the ID is not used because it is also marshaled into
// the value.
readFn := func(k, v []byte) error {
// Skip the two special purpose keys.
if bytes.Equal(k, lastAddIndexKey) ||
bytes.Equal(k, lastSettleIndexKey) {

return nil
}

// There should be no sub-buckets.
if v == nil {
return fmt.Errorf("invalid bucket structure")
}

account, err := deserializeAccount(v)
if err != nil {
return err
}

accounts = append(accounts, account)
return nil
}

// We know the bucket should exist since it's created when
// the account storage is initialized.
return tx.ReadBucket(accountBucketName).ForEach(readFn)
}, func() {
accounts = nil
})
if err != nil {
return nil, err
}

return accounts, nil
}

// migrateSingleAccountToSQL runs the migration for a single account from the
// KV database to the SQL database.
func migrateSingleAccountToSQL(ctx context.Context,
Expand Down Expand Up @@ -163,12 +210,12 @@ func migrateSingleAccountToSQL(ctx context.Context,

// migrateAccountsIndicesToSQL runs the migration for the account indices from
// the KV database to the SQL database.
func migrateAccountsIndicesToSQL(ctx context.Context, kvStore *BoltStore,
func migrateAccountsIndicesToSQL(ctx context.Context, kvStore kvdb.Backend,
tx SQLQueries) error {

log.Infof("Starting migration of accounts indices from KV to SQL")

addIndex, settleIndex, err := kvStore.LastIndexes(ctx)
addIndex, settleIndex, err := getBBoltIndices(kvStore)
if errors.Is(err, ErrNoInvoiceIndexKnown) {
log.Infof("No indices found in KV store, skipping migration")
return nil
Expand Down Expand Up @@ -211,6 +258,40 @@ func migrateAccountsIndicesToSQL(ctx context.Context, kvStore *BoltStore,
return nil
}

// getBBoltIndices is a helper function that fetches the índices from the
// Bbolt store, by iterating directly over the buckets, without needing to
// use any public functions of the BoltStore struct.
func getBBoltIndices(db kvdb.Backend) (uint64, uint64, error) {
var (
addValue, settleValue []byte
)
err := db.View(func(tx kvdb.RTx) error {
bucket := tx.ReadBucket(accountBucketName)
if bucket == nil {
return ErrAccountBucketNotFound
}

addValue = bucket.Get(lastAddIndexKey)
if len(addValue) == 0 {
return ErrNoInvoiceIndexKnown
}

settleValue = bucket.Get(lastSettleIndexKey)
if len(settleValue) == 0 {
return ErrNoInvoiceIndexKnown
}

return nil
}, func() {
addValue, settleValue = nil, nil
})
if err != nil {
return 0, 0, err
}

return byteOrder.Uint64(addValue), byteOrder.Uint64(settleValue), nil
}

// overrideAccountTimeZone overrides the time zone of the account to the local
// time zone and chops off the nanosecond part for comparison. This is needed
// because KV database stores times as-is which as an unwanted side effect would
Expand Down
5 changes: 1 addition & 4 deletions accounts/sql_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ func TestAccountStoreMigration(t *testing.T) {
*db.TransactionExecutor[SQLQueries]) {

testDBStore := NewTestDB(t, clock)
t.Cleanup(func() {
require.NoError(t, testDBStore.Close())
})

store, ok := testDBStore.(*SQLStore)
require.True(t, ok)
Expand Down Expand Up @@ -344,7 +341,7 @@ func TestAccountStoreMigration(t *testing.T) {
err = txEx.ExecTx(ctx, &opts,
func(tx SQLQueries) error {
return MigrateAccountStoreToSQL(
ctx, kvStore, tx,
ctx, kvStore.db, tx,
)
},
)
Expand Down
4 changes: 2 additions & 2 deletions accounts/test_postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ var ErrDBClosed = errors.New("database is closed")

// NewTestDB is a helper function that creates an SQLStore database for testing.
func NewTestDB(t *testing.T, clock clock.Clock) Store {
return NewSQLStore(db.NewTestPostgresDB(t).BaseDB, clock)
return createStore(t, db.NewTestPostgresDB(t).BaseDB, clock)
}

// NewTestDBFromPath is a helper function that creates a new SQLStore with a
// connection to an existing postgres database for testing.
func NewTestDBFromPath(t *testing.T, dbPath string,
clock clock.Clock) Store {

return NewSQLStore(db.NewTestPostgresDB(t).BaseDB, clock)
return createStore(t, db.NewTestPostgresDB(t).BaseDB, clock)
}
22 changes: 22 additions & 0 deletions accounts/test_sql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//go:build test_db_postgres || test_db_sqlite

package accounts

import (
"testing"

"github.com/lightninglabs/lightning-terminal/db"
"github.com/lightningnetwork/lnd/clock"
"github.com/stretchr/testify/require"
)

// createStore is a helper function that creates a new SQLStore and ensure that
// it is closed when during the test cleanup.
func createStore(t *testing.T, sqlDB *db.BaseDB, clock clock.Clock) *SQLStore {
store := NewSQLStore(sqlDB, clock)
t.Cleanup(func() {
require.NoError(t, store.Close())
})

return store
}
6 changes: 3 additions & 3 deletions accounts/test_sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ var ErrDBClosed = errors.New("database is closed")

// NewTestDB is a helper function that creates an SQLStore database for testing.
func NewTestDB(t *testing.T, clock clock.Clock) Store {
return NewSQLStore(db.NewTestSqliteDB(t).BaseDB, clock)
return createStore(t, db.NewTestSqliteDB(t).BaseDB, clock)
}

// NewTestDBFromPath is a helper function that creates a new SQLStore with a
// connection to an existing SQL database for testing.
func NewTestDBFromPath(t *testing.T, dbPath string,
clock clock.Clock) Store {

return NewSQLStore(
db.NewTestSqliteDbHandleFromPath(t, dbPath).BaseDB, clock,
return createStore(
t, db.NewTestSqliteDbHandleFromPath(t, dbPath).BaseDB, clock,
)
}
Loading