Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions graph/db/graph_sql_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
//go:build test_db_postgres || test_db_sqlite

package graphdb

import (
"testing"

"github.com/stretchr/testify/require"
)

// TestNodeIsPublicCacheInvalidation ensures that we invalidate correctly our
// cache we use when determing if a node is public or not.
func TestNodeIsPublicCacheInvalidation(t *testing.T) {
t.Parallel()
ctx := t.Context()

graph := MakeTestGraph(t)

node1 := createTestVertex(t)
node2 := createTestVertex(t)

require.NoError(t, graph.AddNode(ctx, node1))
require.NoError(t, graph.AddNode(ctx, node2))

edge, _ := createEdge(10, 0, 0, 0, node1, node2)
require.NoError(t, graph.AddChannelEdge(ctx, &edge))

// First IsPublic call should populate cache.
isPublic1, err := graph.IsPublicNode(node1.PubKeyBytes)
require.NoError(t, err)
require.True(t, isPublic1)

// Test invalidation scenarios:

// 1. DeleteChannelEdges:
// Above, the channel being public should be cached, but we expect that
// DeleteChannelEdge will invalidate the cache for both nodes else when
// we call IsPublic, we will hit the cache.
err = graph.DeleteChannelEdges(false, true, edge.ChannelID)
require.NoError(t, err)
isPublic1, err = graph.IsPublicNode(node1.PubKeyBytes)
require.NoError(t, err)
require.False(t, isPublic1)

isPublic2, err := graph.IsPublicNode(node2.PubKeyBytes)
require.NoError(t, err)
require.False(t, isPublic2)

// 2. AddChannelEdge:
// Now we know that the last `IsPublicNode` call above will cache our
// nodes with `isPublic` = false. But add a new channel edge should
// invalidate the cache such that when we call `IsPublic` it should
// return `True`.
edge2, _ := createEdge(10, 1, 0, 1, node1, node2)
require.NoError(t, graph.AddChannelEdge(ctx, &edge2))
isPublic1, err = graph.IsPublicNode(node1.PubKeyBytes)
require.NoError(t, err)
require.True(t, isPublic1)

isPublic2, err = graph.IsPublicNode(node2.PubKeyBytes)
require.NoError(t, err)
require.True(t, isPublic2)

// 3. DeleteNode:
// Again, the last two sets of `IsPublic` should have cached our nodes
// as `True`. Now we can delete a node and expect the next call to be
// False.
//
// NOTE: We don't get an error calling `IsPublicNode` because of how the
// SQL query is implemented to check for the existence of public nodes.
require.NoError(t, graph.DeleteNode(ctx, node1.PubKeyBytes))
isPublic1, err = graph.IsPublicNode(node1.PubKeyBytes)
require.NoError(t, err)
require.False(t, isPublic1)
}
18 changes: 15 additions & 3 deletions graph/db/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ const (
// around 40MB.
DefaultChannelCacheSize = 20000

// DefaultPublicNodeCacheSize is the default number of node public
// status entries to cache. With 15k nodes, this produces a cache of
// around 500KB.
DefaultPublicNodeCacheSize = 15000

// DefaultPreAllocCacheNumNodes is the default number of channels we
// assume for mainnet for pre-allocating the graph cache. As of
// September 2021, there currently are 14k nodes in a strictly pruned
Expand Down Expand Up @@ -125,6 +130,10 @@ type StoreOptions struct {
// channel cache.
ChannelCacheSize int

// PublicNodeCacheSize is the maximum number of node public status
// entries to hold in the cache.
PublicNodeCacheSize int

// BatchCommitInterval is the maximum duration the batch schedulers will
// wait before attempting to commit a pending set of updates.
BatchCommitInterval time.Duration
Expand All @@ -138,9 +147,10 @@ type StoreOptions struct {
// DefaultOptions returns a StoreOptions populated with default values.
func DefaultOptions() *StoreOptions {
return &StoreOptions{
RejectCacheSize: DefaultRejectCacheSize,
ChannelCacheSize: DefaultChannelCacheSize,
NoMigration: false,
RejectCacheSize: DefaultRejectCacheSize,
ChannelCacheSize: DefaultChannelCacheSize,
PublicNodeCacheSize: DefaultPublicNodeCacheSize,
NoMigration: false,
}
}

Expand Down Expand Up @@ -169,3 +179,5 @@ func WithBatchCommitInterval(interval time.Duration) StoreOptionModifier {
o.BatchCommitInterval = interval
}
}

// Todo(abdulkbk) consider adding WithPublicNodeCacheSize.
102 changes: 94 additions & 8 deletions graph/db/sql_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightninglabs/neutrino/cache"
"github.com/lightninglabs/neutrino/cache/lru"
"github.com/lightningnetwork/lnd/aliasmgr"
"github.com/lightningnetwork/lnd/batch"
"github.com/lightningnetwork/lnd/fn/v2"
Expand Down Expand Up @@ -181,12 +183,14 @@ type SQLStore struct {
cfg *SQLStoreConfig
db BatchedSQLQueries

// cacheMu guards all caches (rejectCache and chanCache). If
// this mutex will be acquired at the same time as the DB mutex then
// the cacheMu MUST be acquired first to prevent deadlock.
cacheMu sync.RWMutex
rejectCache *rejectCache
chanCache *channelCache
// cacheMu guards all caches (rejectCache, chanCache, and
// publicNodeCache). If this mutex will be acquired at the same time as
// the DB mutex then the cacheMu MUST be acquired first to prevent
// deadlock.
cacheMu sync.RWMutex
rejectCache *rejectCache
chanCache *channelCache
publicNodeCache *lru.Cache[[33]byte, *cachedPublicNode]

chanScheduler batch.Scheduler[SQLQueries]
nodeScheduler batch.Scheduler[SQLQueries]
Expand All @@ -195,6 +199,18 @@ type SQLStore struct {
srcNodeMu sync.Mutex
}

// cachedPublicNode is a simple wrapper for a boolean value that can be
// stored in an LRU cache. The LRU cache requires a Size() method.
type cachedPublicNode struct {
isPublic bool
}

// Size returns the size of the cache entry. We return 1 as we just want to
// limit the number of entries rather than their actual memory size.
func (c *cachedPublicNode) Size() (uint64, error) {
return 1, nil
}

// A compile-time assertion to ensure that SQLStore implements the V1Store
// interface.
var _ V1Store = (*SQLStore)(nil)
Expand Down Expand Up @@ -229,7 +245,10 @@ func NewSQLStore(cfg *SQLStoreConfig, db BatchedSQLQueries,
db: db,
rejectCache: newRejectCache(opts.RejectCacheSize),
chanCache: newChannelCache(opts.ChannelCacheSize),
srcNodes: make(map[ProtocolVersion]*srcNodeInfo),
publicNodeCache: lru.NewCache[[33]byte, *cachedPublicNode](
uint64(opts.PublicNodeCacheSize),
),
srcNodes: make(map[ProtocolVersion]*srcNodeInfo),
}

s.chanScheduler = batch.NewTimeScheduler(
Expand Down Expand Up @@ -416,6 +435,10 @@ func (s *SQLStore) DeleteNode(ctx context.Context,
return fmt.Errorf("unable to delete node: %w", err)
}

s.cacheMu.Lock()
s.removePublicNodeCache(pubKey)
s.cacheMu.Unlock()

return nil
}

Expand Down Expand Up @@ -715,6 +738,10 @@ func (s *SQLStore) AddChannelEdge(ctx context.Context,
default:
s.rejectCache.remove(edge.ChannelID)
s.chanCache.remove(edge.ChannelID)
s.removePublicNodeCache(
edge.NodeKey1Bytes, edge.NodeKey2Bytes,
)

return nil
}
},
Expand Down Expand Up @@ -1721,6 +1748,7 @@ func (s *SQLStore) MarkEdgeZombie(chanID uint64,

s.rejectCache.remove(chanID)
s.chanCache.remove(chanID)
s.removePublicNodeCache(pubKey1, pubKey2)

return nil
}
Expand Down Expand Up @@ -1946,6 +1974,14 @@ func (s *SQLStore) DeleteChannelEdges(strictZombiePruning, markZombie bool,
s.chanCache.remove(chanID)
}

var pubkeys [][33]byte
for _, edge := range edges {
pubkeys = append(
pubkeys, edge.NodeKey1Bytes, edge.NodeKey2Bytes,
)
}
s.removePublicNodeCache(pubkeys...)

return edges, nil
}

Expand Down Expand Up @@ -2281,8 +2317,28 @@ func (s *SQLStore) ChannelID(chanPoint *wire.OutPoint) (uint64, error) {
func (s *SQLStore) IsPublicNode(pubKey [33]byte) (bool, error) {
ctx := context.TODO()

// Check the cache first with a read lock.
s.cacheMu.RLock()
cached, err := s.publicNodeCache.Get(pubKey)

switch {
case errors.Is(err, cache.ErrElementNotFound):
// Cache not found, so we'll need to fetch the node from the
// database.

case cached != nil:
s.cacheMu.RUnlock()
return cached.isPublic, nil

case err != nil:
s.cacheMu.RUnlock()
log.Warnf("unable to check cache if node is public: %w", err)
}

s.cacheMu.RUnlock()

var isPublic bool
err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
err = s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
var err error
isPublic, err = db.IsPublicV1Node(ctx, pubKey[:])

Expand All @@ -2293,6 +2349,17 @@ func (s *SQLStore) IsPublicNode(pubKey [33]byte) (bool, error) {
"public: %w", err)
}

// Store the result in cache.
s.cacheMu.Lock()
_, err = s.publicNodeCache.Put(pubKey, &cachedPublicNode{
isPublic: isPublic,
})
if err != nil {
log.Warnf("unable to store node info in cache: %w", err)
}

s.cacheMu.Unlock()

return isPublic, nil
}

Expand Down Expand Up @@ -2644,6 +2711,9 @@ func (s *SQLStore) PruneGraph(spentOutputs []*wire.OutPoint,
for _, channel := range closedChans {
s.rejectCache.remove(channel.ChannelID)
s.chanCache.remove(channel.ChannelID)
s.removePublicNodeCache(
channel.NodeKey1Bytes, channel.NodeKey2Bytes,
)
}

return closedChans, prunedNodes, nil
Expand Down Expand Up @@ -2908,9 +2978,15 @@ func (s *SQLStore) DisconnectBlockAtHeight(height uint32) (
"height: %w", err)
}

s.cacheMu.Lock()
defer s.cacheMu.Unlock()

for _, channel := range removedChans {
s.rejectCache.remove(channel.ChannelID)
s.chanCache.remove(channel.ChannelID)
s.removePublicNodeCache(
channel.NodeKey1Bytes, channel.NodeKey2Bytes,
)
}

return removedChans, nil
Expand Down Expand Up @@ -5733,3 +5809,13 @@ func handleZombieMarking(ctx context.Context, db SQLQueries,
},
)
}

// removePublicNodeCache takes in a list of public keys and removes the
// corresponding nodes info from the cache if it exists.
//
// NOTE: This method must be called with cacheMu held.
func (s *SQLStore) removePublicNodeCache(pubkeys ...[33]byte) {
for _, pubkey := range pubkeys {
s.publicNodeCache.Delete(pubkey)
}
}
Loading