-
Notifications
You must be signed in to change notification settings - Fork 102
[sql-33] session: add migration code from kvdb to SQL #1051
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
ViktorTigerstrom
merged 4 commits into
lightninglabs:master
from
ViktorTigerstrom:2025-04-migrate-sessions
Jun 12, 2025
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
b48e40f
session: update NewTestDB funcs to return Store
ViktorTigerstrom 6c4c6a4
session: ensure that test SQL store is closed
ViktorTigerstrom 8f1aa99
session: add migration code from kvdb to SQL
ViktorTigerstrom 2007d53
session: add randomized session migration test
ViktorTigerstrom File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,396 @@ | ||
package session | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"database/sql" | ||
"errors" | ||
"fmt" | ||
"reflect" | ||
"time" | ||
|
||
"github.com/davecgh/go-spew/spew" | ||
"github.com/lightninglabs/lightning-terminal/accounts" | ||
"github.com/lightninglabs/lightning-terminal/db/sqlc" | ||
"github.com/lightningnetwork/lnd/sqldb" | ||
"github.com/pmezard/go-difflib/difflib" | ||
"go.etcd.io/bbolt" | ||
) | ||
|
||
var ( | ||
// ErrMigrationMismatch is returned when the migrated session does not | ||
// match the original session. | ||
ErrMigrationMismatch = fmt.Errorf("migrated session does not match " + | ||
"original session") | ||
) | ||
|
||
// MigrateSessionStoreToSQL runs the migration of all sessions from the KV | ||
// database to the SQL database. The migration is done in a single transaction | ||
// to ensure that all sessions are migrated or none at all. | ||
// | ||
// NOTE: As sessions may contain linked accounts, the accounts sql migration | ||
// MUST be run prior to this migration. | ||
func MigrateSessionStoreToSQL(ctx context.Context, kvStore *bbolt.DB, | ||
tx SQLQueries) error { | ||
|
||
log.Infof("Starting migration of the KV sessions store to SQL") | ||
|
||
kvSessions, err := getBBoltSessions(kvStore) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// If sessions are linked to a group, we must insert the initial session | ||
// of each group before the other sessions in that group. This ensures | ||
// we can retrieve the SQL group ID when inserting the remaining | ||
// sessions. Therefore, we first insert all initial group sessions, | ||
// allowing us to fetch the group IDs and insert the rest of the | ||
// sessions afterward. | ||
// We therefore filter out the initial sessions first, and then migrate | ||
// them prior to the rest of the sessions. | ||
var ( | ||
initialGroupSessions []*Session | ||
linkedSessions []*Session | ||
) | ||
|
||
for _, kvSession := range kvSessions { | ||
if kvSession.GroupID == kvSession.ID { | ||
initialGroupSessions = append( | ||
initialGroupSessions, kvSession, | ||
) | ||
} else { | ||
linkedSessions = append(linkedSessions, kvSession) | ||
} | ||
} | ||
|
||
err = migrateSessionsToSQLAndValidate(ctx, tx, initialGroupSessions) | ||
if err != nil { | ||
return fmt.Errorf("migration of non-linked session failed: %w", | ||
err) | ||
} | ||
|
||
err = migrateSessionsToSQLAndValidate(ctx, tx, linkedSessions) | ||
if err != nil { | ||
return fmt.Errorf("migration of linked session failed: %w", err) | ||
} | ||
|
||
total := len(initialGroupSessions) + len(linkedSessions) | ||
log.Infof("All sessions migrated from KV to SQL. Total number of "+ | ||
"sessions migrated: %d", total) | ||
|
||
return nil | ||
} | ||
|
||
// getBBoltSessions is a helper function that fetches all sessions from the | ||
// Bbolt store, by iterating directly over the buckets, without needing to | ||
// use any public functions of the BoltStore struct. | ||
func getBBoltSessions(db *bbolt.DB) ([]*Session, error) { | ||
var sessions []*Session | ||
|
||
err := db.View(func(tx *bbolt.Tx) error { | ||
sessionBucket, err := getBucket(tx, sessionBucketKey) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return sessionBucket.ForEach(func(k, v []byte) error { | ||
// We'll also get buckets here, skip those (identified | ||
// by nil value). | ||
if v == nil { | ||
return nil | ||
} | ||
|
||
session, err := DeserializeSession(bytes.NewReader(v)) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
sessions = append(sessions, session) | ||
|
||
return nil | ||
}) | ||
}) | ||
|
||
return sessions, err | ||
} | ||
|
||
// migrateSessionsToSQLAndValidate runs the migration for the passed sessions | ||
// from the KV database to the SQL database, and validates that the migrated | ||
// sessions match the original sessions. | ||
func migrateSessionsToSQLAndValidate(ctx context.Context, | ||
tx SQLQueries, kvSessions []*Session) error { | ||
|
||
for _, kvSession := range kvSessions { | ||
err := migrateSingleSessionToSQL(ctx, tx, kvSession) | ||
if err != nil { | ||
return fmt.Errorf("unable to migrate session(%v): %w", | ||
kvSession.ID, err) | ||
} | ||
|
||
// Validate that the session was correctly migrated and matches | ||
// the original session in the kv store. | ||
sqlSess, err := tx.GetSessionByAlias(ctx, kvSession.ID[:]) | ||
if err != nil { | ||
if errors.Is(err, sql.ErrNoRows) { | ||
err = ErrSessionNotFound | ||
} | ||
return fmt.Errorf("unable to get migrated session "+ | ||
"from sql store: %w", err) | ||
} | ||
|
||
migratedSession, err := unmarshalSession(ctx, tx, sqlSess) | ||
if err != nil { | ||
return fmt.Errorf("unable to unmarshal migrated "+ | ||
"session: %w", err) | ||
} | ||
|
||
ellemouton marked this conversation as resolved.
Show resolved
Hide resolved
|
||
overrideSessionTimeZone(kvSession) | ||
overrideSessionTimeZone(migratedSession) | ||
overrideMacaroonRecipe(kvSession, migratedSession) | ||
|
||
if !reflect.DeepEqual(kvSession, migratedSession) { | ||
diff := difflib.UnifiedDiff{ | ||
A: difflib.SplitLines( | ||
spew.Sdump(kvSession), | ||
), | ||
B: difflib.SplitLines( | ||
spew.Sdump(migratedSession), | ||
), | ||
FromFile: "Expected", | ||
FromDate: "", | ||
ToFile: "Actual", | ||
ToDate: "", | ||
Context: 3, | ||
} | ||
diffText, _ := difflib.GetUnifiedDiffString(diff) | ||
|
||
return fmt.Errorf("%w: %v.\n%v", ErrMigrationMismatch, | ||
kvSession.ID, diffText) | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// migrateSingleSessionToSQL runs the migration for a single session from the | ||
// KV database to the SQL database. Note that if the session links to an | ||
// account, the linked accounts store MUST have been migrated before that | ||
// session is migrated. | ||
func migrateSingleSessionToSQL(ctx context.Context, | ||
tx SQLQueries, session *Session) error { | ||
|
||
var ( | ||
acctID sql.NullInt64 | ||
err error | ||
remotePubKey []byte | ||
) | ||
|
||
session.AccountID.WhenSome(func(alias accounts.AccountID) { | ||
// Fetch the SQL ID for the account from the SQL store. | ||
var acctAlias int64 | ||
acctAlias, err = alias.ToInt64() | ||
if err != nil { | ||
return | ||
} | ||
|
||
var acctDBID int64 | ||
acctDBID, err = tx.GetAccountIDByAlias(ctx, acctAlias) | ||
if errors.Is(err, sql.ErrNoRows) { | ||
err = accounts.ErrAccNotFound | ||
return | ||
} else if err != nil { | ||
return | ||
} | ||
|
||
acctID = sqldb.SQLInt64(acctDBID) | ||
}) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if session.RemotePublicKey != nil { | ||
remotePubKey = session.RemotePublicKey.SerializeCompressed() | ||
} | ||
|
||
// Proceed to insert the session into the sql db. | ||
sqlId, err := tx.InsertSession(ctx, sqlc.InsertSessionParams{ | ||
Alias: session.ID[:], | ||
Label: session.Label, | ||
State: int16(session.State), | ||
Type: int16(session.Type), | ||
Expiry: session.Expiry.UTC(), | ||
CreatedAt: session.CreatedAt.UTC(), | ||
ServerAddress: session.ServerAddr, | ||
DevServer: session.DevServer, | ||
MacaroonRootKey: int64(session.MacaroonRootKey), | ||
PairingSecret: session.PairingSecret[:], | ||
LocalPrivateKey: session.LocalPrivateKey.Serialize(), | ||
LocalPublicKey: session.LocalPublicKey.SerializeCompressed(), | ||
RemotePublicKey: remotePubKey, | ||
Privacy: session.WithPrivacyMapper, | ||
AccountID: acctID, | ||
}) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Since the InsertSession query doesn't support that we set the revoked | ||
// field during the insert, we need to set the field after the session | ||
// has been created. | ||
ViktorTigerstrom marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if !session.RevokedAt.IsZero() { | ||
err = tx.SetSessionRevokedAt( | ||
ctx, sqlc.SetSessionRevokedAtParams{ | ||
ID: sqlId, | ||
RevokedAt: sqldb.SQLTime( | ||
session.RevokedAt.UTC(), | ||
), | ||
}, | ||
) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
// After the session has been inserted, we need to update the session | ||
// with the group ID if it is linked to a group. We need to do this | ||
// after the session has been inserted, because the group ID can be the | ||
// session itself, and therefore the SQL id for the session won't exist | ||
// prior to inserting the session. | ||
ViktorTigerstrom marked this conversation as resolved.
Show resolved
Hide resolved
|
||
groupID, err := tx.GetSessionIDByAlias(ctx, session.GroupID[:]) | ||
if errors.Is(err, sql.ErrNoRows) { | ||
return ErrUnknownGroup | ||
} else if err != nil { | ||
return fmt.Errorf("unable to fetch group(%x): %w", | ||
session.GroupID[:], err) | ||
} | ||
|
||
// Now lets set the group ID for the session. | ||
err = tx.SetSessionGroupID(ctx, sqlc.SetSessionGroupIDParams{ | ||
ID: sqlId, | ||
GroupID: sqldb.SQLInt64(groupID), | ||
}) | ||
if err != nil { | ||
return fmt.Errorf("unable to set group Alias: %w", err) | ||
} | ||
|
||
// Once we have the sqlID for the session, we can proceed to insert rows | ||
// into the linked child tables. | ||
if session.MacaroonRecipe != nil { | ||
// We start by inserting the macaroon permissions. | ||
for _, sessionPerm := range session.MacaroonRecipe.Permissions { | ||
err = tx.InsertSessionMacaroonPermission( | ||
ctx, sqlc.InsertSessionMacaroonPermissionParams{ | ||
SessionID: sqlId, | ||
Entity: sessionPerm.Entity, | ||
Action: sessionPerm.Action, | ||
}, | ||
) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
// Next we insert the macaroon caveats. | ||
for _, caveat := range session.MacaroonRecipe.Caveats { | ||
err = tx.InsertSessionMacaroonCaveat( | ||
ellemouton marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ctx, sqlc.InsertSessionMacaroonCaveatParams{ | ||
SessionID: sqlId, | ||
CaveatID: caveat.Id, | ||
VerificationID: caveat.VerificationId, | ||
Location: sqldb.SQLStr( | ||
caveat.Location, | ||
), | ||
}, | ||
) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
|
||
// That's followed by the feature config. | ||
if session.FeatureConfig != nil { | ||
for featureName, config := range *session.FeatureConfig { | ||
err = tx.InsertSessionFeatureConfig( | ||
ctx, sqlc.InsertSessionFeatureConfigParams{ | ||
SessionID: sqlId, | ||
FeatureName: featureName, | ||
Config: config, | ||
}, | ||
) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
|
||
// Finally we insert the privacy flags. | ||
for _, privacyFlag := range session.PrivacyFlags { | ||
err = tx.InsertSessionPrivacyFlag( | ||
ctx, sqlc.InsertSessionPrivacyFlagParams{ | ||
SessionID: sqlId, | ||
Flag: int32(privacyFlag), | ||
}, | ||
) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// overrideSessionTimeZone overrides the time zone of the session to the local | ||
// time zone and chops off the nanosecond part for comparison. This is needed | ||
// because KV database stores times as-is which as an unwanted side effect would | ||
// fail migration due to time comparison expecting both the original and | ||
// migrated sessions to be in the same local time zone and in microsecond | ||
// precision. Note that PostgresSQL stores times in microsecond precision while | ||
// SQLite can store times in nanosecond precision if using TEXT storage class. | ||
func overrideSessionTimeZone(session *Session) { | ||
fixTime := func(t time.Time) time.Time { | ||
return t.In(time.Local).Truncate(time.Microsecond) | ||
} | ||
|
||
if !session.Expiry.IsZero() { | ||
session.Expiry = fixTime(session.Expiry) | ||
} | ||
|
||
if !session.CreatedAt.IsZero() { | ||
session.CreatedAt = fixTime(session.CreatedAt) | ||
} | ||
|
||
if !session.RevokedAt.IsZero() { | ||
session.RevokedAt = fixTime(session.RevokedAt) | ||
} | ||
} | ||
|
||
// overrideMacaroonRecipe overrides the MacaroonRecipe for the SQL session in a | ||
// certain scenario: | ||
// In the bbolt store, a session can have a non-nil macaroon struct, despite | ||
// both the permissions and caveats being nil. There is no way to represent this | ||
// in the SQL store, as the macaroon permissions and caveats are separate | ||
// tables. Therefore, in the scenario where a MacaroonRecipe exists for the | ||
// bbolt version, but both the permissions and caveats are nil, we override the | ||
// MacaroonRecipe for the SQL version and set it to a MacaroonRecipe with | ||
// nil permissions and caveats. This is needed to ensure that the deep equals | ||
// check in the migration validation does not fail in this scenario. | ||
// Additionally, if either the permissions or caveats aren't set, for the | ||
// MacaroonRecipe, that is represented as empty array in the SQL store, but | ||
// as nil in the bbolt store. Therefore, we also override the permissions | ||
// or caveats to nil for the migrated session in that scenario, so that the | ||
// deep equals check does not fail in this scenario either. | ||
func overrideMacaroonRecipe(kvSession *Session, migratedSession *Session) { | ||
if kvSession.MacaroonRecipe != nil { | ||
kvPerms := kvSession.MacaroonRecipe.Permissions | ||
kvCaveats := kvSession.MacaroonRecipe.Caveats | ||
|
||
if kvPerms == nil && kvCaveats == nil { | ||
migratedSession.MacaroonRecipe = &MacaroonRecipe{} | ||
bitromortac marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} else if kvPerms == nil { | ||
migratedSession.MacaroonRecipe.Permissions = nil | ||
} else if kvCaveats == nil { | ||
migratedSession.MacaroonRecipe.Caveats = nil | ||
} | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting - are we not able to rely on the fact that sessions are stored in order in kvdb? ie, if we get a session with a group ID set, that will always either point to the session at hand or a session we've already handled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ie, i think we can just:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or did you actually run into a scenario where these were not in order?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm so the
listSessions
for the kvdb returns the sessions sorted by theCreatedAt
field. So given that this field actually contains timestamps in the correct order, your suggestion should work. But I see 2 potential scenarios where that wouldn't be true:CreatedAt
timestamp prior to the initial session they are linking to.clock.Clock
forlitd
, that we realised that this could have the side effect that the new timestamps would potentially get changed by +- a few hours unless I'm remembering this incorrectly? In that scenario, for some edge case users, they could have created the initial session before updating to the newlitd
version that added theclock.Clock
implementation, and then linked a session to it right after which could then have an earlierCreatedAt
timestamp than the initial session.As I don't see any real cons with my current approach, other than it adding some more code complexity, I think it's a worth tradeoff than risking the migrations failing for such edge cases. Do you agree?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd opt for not using public methods like ListAllSessions for the migration. Rather let the migration manually get to the buckets/sub-buckets that it needs. We want to be able to change how ListAllSessions works without affecting the migration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
notice that the LND invoice migration does the same: takes a raw kvdb.Backend and *sqlc.Queries
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
im also more in favour of doing it this way cause then the IDs that the sessions have will be more intuitive as then (as long as the user didnt do anything strange) , the created at times will mostly be increasing at the IDs increase
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool thanks! I updated this PR to instead pass a
*bbolt.DB
and directly iterate over the buckets, rather than using the public functions of theBoltStore
. I've done the same for the accounts migration as well now with:#1084
I've also looked through the previous migrations of the sessions kvdb as discussed offline, and none of them should be able to effect the order of the sessions entires in the kvdb store. However as also discussed offline, I didn't update the logic in the migrations, so that non-linked sessions are still migrated prior to linked sessions. I'm open to changing this though if you believe it's worth updating.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, i thought the offline conclusion was the opposite 😅 cause i think it is just nice to have that the order in the DB is (at least on a best effort basis) the same as the order of creation.
Not completely blocking but i'd defs prefer it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I should have been aware of this earlier! Since we insert the sessions in the kvdb store like this into the bucket:
Where the bucket is the bucket for the
sessionBucketKey
and the result of thegetSessionKey
will besession.LocalPublicKey.SerializeCompressed()
, the order the sessions when iterating over thesessionBucketKey
will actually be the lexicographical order of the serializedLocalPublicKey
, and not in insertion order of the sessions.Therefore there's no way to iterate over them in "insertion order", unless you know some way I'm unaware of :)? The closest we can get is to do what the kvdb store
listSessions
function does, which sorts the sessions by theCreatedAt
timestamp, but that has the issue of what I brought up earlier with: #1051 (comment)Therefore I think our best option that doesn't over complicate the code is to keep the migration code as what I've implemented today, i.e. first insert non-linked sessions, followed by inserting linked sessions?