Skip to content

[sql-47] Actions migration prep #1126

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 60 additions & 5 deletions accounts/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package accounts
import (
"context"
"encoding/hex"
"errors"
"fmt"
"sync/atomic"
"time"

"github.com/btcsuite/btcd/btcutil"
Expand All @@ -16,8 +18,17 @@ import (
"gopkg.in/macaroon.v2"
)

var (
// ErrServerNotActive indicates that the server has started but hasn't
// fully finished the startup process.
ErrServerNotActive = errors.New("accounts server is still in the " +
"process of starting")
)

// RPCServer is the main server that implements the Accounts gRPC service.
type RPCServer struct {
active atomic.Bool

litrpc.UnimplementedAccountsServer

service *InterceptorService
Expand All @@ -26,13 +37,29 @@ type RPCServer struct {
}

// NewRPCServer returns a new RPC server for the given service.
func NewRPCServer(service *InterceptorService,
superMacBaker litmac.Baker) *RPCServer {
func NewRPCServer() *RPCServer {
return &RPCServer{}
}

// started returns true if the server has been started, and false otherwise.
// NOTE: This function is safe for concurrent access.
func (s *RPCServer) started() bool {
return s.active.Load()
}

return &RPCServer{
service: service,
superMacBaker: superMacBaker,
// Start adds the necessary dependencies for the RPCServer to be able to process
// requests, and starts the RPCServer.
func (s *RPCServer) Start(service *InterceptorService,
superMacBaker litmac.Baker) error {

if s.active.Swap(true) {
return errors.New("accounts rpc server is already started")
}

s.service = service
s.superMacBaker = superMacBaker

return nil
}

// CreateAccount adds an entry to the account database. This entry represents
Expand All @@ -50,6 +77,10 @@ func (s *RPCServer) CreateAccount(ctx context.Context,
req *litrpc.CreateAccountRequest) (*litrpc.CreateAccountResponse,
error) {

if !s.started() {
return nil, ErrServerNotActive
}

log.Infof("[createaccount] label=%v, balance=%d, expiration=%d",
req.Label, req.AccountBalance, req.ExpirationDate)

Expand Down Expand Up @@ -110,6 +141,10 @@ func (s *RPCServer) CreateAccount(ctx context.Context,
func (s *RPCServer) UpdateAccount(ctx context.Context,
req *litrpc.UpdateAccountRequest) (*litrpc.Account, error) {

if !s.started() {
return nil, ErrServerNotActive
}

log.Infof("[updateaccount] id=%s, label=%v, balance=%d, expiration=%d",
req.Id, req.Label, req.AccountBalance, req.ExpirationDate)

Expand All @@ -136,6 +171,10 @@ func (s *RPCServer) CreditAccount(ctx context.Context,
req *litrpc.CreditAccountRequest) (*litrpc.CreditAccountResponse,
error) {

if !s.started() {
return nil, ErrServerNotActive
}

if req.GetAccount() == nil {
return nil, fmt.Errorf("account param must be specified")
}
Expand Down Expand Up @@ -174,6 +213,10 @@ func (s *RPCServer) CreditAccount(ctx context.Context,
func (s *RPCServer) DebitAccount(ctx context.Context,
req *litrpc.DebitAccountRequest) (*litrpc.DebitAccountResponse, error) {

if !s.started() {
return nil, ErrServerNotActive
}

if req.GetAccount() == nil {
return nil, fmt.Errorf("account param must be specified")
}
Expand Down Expand Up @@ -212,6 +255,10 @@ func (s *RPCServer) DebitAccount(ctx context.Context,
func (s *RPCServer) ListAccounts(ctx context.Context,
_ *litrpc.ListAccountsRequest) (*litrpc.ListAccountsResponse, error) {

if !s.started() {
return nil, ErrServerNotActive
}

log.Info("[listaccounts]")

// Retrieve all accounts from the macaroon account store.
Expand All @@ -237,6 +284,10 @@ func (s *RPCServer) ListAccounts(ctx context.Context,
func (s *RPCServer) AccountInfo(ctx context.Context,
req *litrpc.AccountInfoRequest) (*litrpc.Account, error) {

if !s.started() {
return nil, ErrServerNotActive
}

log.Infof("[accountinfo] id=%v, label=%v", req.Id, req.Label)

accountID, err := s.findAccount(ctx, req.Id, req.Label)
Expand All @@ -257,6 +308,10 @@ func (s *RPCServer) RemoveAccount(ctx context.Context,
req *litrpc.RemoveAccountRequest) (*litrpc.RemoveAccountResponse,
error) {

if !s.started() {
return nil, ErrServerNotActive
}

log.Infof("[removeaccount] id=%v, label=%v", req.Id, req.Label)

accountID, err := s.findAccount(ctx, req.Id, req.Label)
Expand Down
136 changes: 98 additions & 38 deletions session_rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think the commit message doesn't need to be repeated, it could just say that we do a similar separation like in the previous commit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok updated to not repeat the message and just mention the previous commit.

The main I see though for repetition, is that I do think it's much better from future perspective where someone needs to git blame a specific row from that commit. It makes more sense if the actual commit message mentions why the change was needed, rather than referring to a previous commit message as that requires someone to look up the exact ordering in the commit tree.

"time"

"github.com/btcsuite/btcd/btcec/v2"
Expand Down Expand Up @@ -40,8 +41,17 @@ import (
// other special cases.
const readOnlyAction = "***readonly***"

var (
// ErrServerNotActive indicates that the server has started but hasn't
// fully finished the startup process.
ErrServerNotActive = errors.New("session server is still in the " +
"process of starting")
)

// sessionRpcServer is the gRPC server for the Session RPC interface.
type sessionRpcServer struct {
active atomic.Bool

litrpc.UnimplementedSessionsServer
litrpc.UnimplementedFirewallServer
litrpc.UnimplementedAutopilotServer
Expand Down Expand Up @@ -70,41 +80,10 @@ type sessionRpcServerConfig struct {
privMap firewalldb.PrivacyMapper
}

// newSessionRPCServer creates a new sessionRpcServer using the passed config.
func newSessionRPCServer(cfg *sessionRpcServerConfig) (*sessionRpcServer,
error) {

// Create the gRPC server that handles adding/removing sessions and the
// actual mailbox server that spins up the Terminal Connect server
// interface.
server := session.NewServer(
func(id session.ID, opts ...grpc.ServerOption) *grpc.Server {
// Add the session ID injector interceptors first so
// that the session ID is available in the context of
// all interceptors that come after.
allOpts := []grpc.ServerOption{
addSessionIDToStreamCtx(id),
addSessionIDToUnaryCtx(id),
}

allOpts = append(allOpts, cfg.grpcOptions...)
allOpts = append(allOpts, opts...)

// Construct the gRPC server with the options.
grpcServer := grpc.NewServer(allOpts...)

// Register various grpc servers with the LNC session
// server.
cfg.registerGrpcServers(grpcServer)

return grpcServer
},
)

// newSessionRPCServer creates a new sessionRpcServer.
func newSessionRPCServer() (*sessionRpcServer, error) {
return &sessionRpcServer{
cfg: cfg,
sessionServer: server,
quit: make(chan struct{}),
quit: make(chan struct{}),
}, nil
}

Expand Down Expand Up @@ -164,9 +143,52 @@ func addSessionIDToUnaryCtx(id session.ID) grpc.ServerOption {
})
}

// start all the components necessary for the sessionRpcServer to start serving
// requests. This includes resuming all non-revoked sessions.
func (s *sessionRpcServer) start(ctx context.Context) error {
// started returns true if the server has been started, and false otherwise.
// NOTE: This function is safe for concurrent access.
func (s *sessionRpcServer) started() bool {
return s.active.Load()
}

// start starts a new sessionRpcServer using the passed config, and adds all
// components necessary for the sessionRpcServer to start serving requests. This
// includes resuming all non-revoked sessions.
func (s *sessionRpcServer) start(ctx context.Context,
cfg *sessionRpcServerConfig) error {

if s.active.Swap(true) {
return errors.New("session rpc server is already started")
}

// Create the gRPC server that handles adding/removing sessions and the
// actual mailbox server that spins up the Terminal Connect server
// interface.
server := session.NewServer(
func(id session.ID, opts ...grpc.ServerOption) *grpc.Server {
// Add the session ID injector interceptors first so
// that the session ID is available in the context of
// all interceptors that come after.
allOpts := []grpc.ServerOption{
addSessionIDToStreamCtx(id),
addSessionIDToUnaryCtx(id),
}

allOpts = append(allOpts, cfg.grpcOptions...)
allOpts = append(allOpts, opts...)

// Construct the gRPC server with the options.
grpcServer := grpc.NewServer(allOpts...)

// Register various grpc servers with the LNC session
// server.
cfg.registerGrpcServers(grpcServer)

return grpcServer
},
)

s.cfg = cfg
s.sessionServer = server

// Delete all sessions in the Reserved state.
err := s.cfg.db.DeleteReservedSessions(ctx)
if err != nil {
Expand Down Expand Up @@ -255,7 +277,9 @@ func (s *sessionRpcServer) start(ctx context.Context) error {
func (s *sessionRpcServer) stop() error {
var returnErr error
s.stopOnce.Do(func() {
s.sessionServer.Stop()
if s.sessionServer != nil {
s.sessionServer.Stop()
}

close(s.quit)
s.wg.Wait()
Expand All @@ -268,6 +292,10 @@ func (s *sessionRpcServer) stop() error {
func (s *sessionRpcServer) AddSession(ctx context.Context,
req *litrpc.AddSessionRequest) (*litrpc.AddSessionResponse, error) {

if !s.started() {
return nil, ErrServerNotActive
}

expiry := time.Unix(int64(req.ExpiryTimestampSeconds), 0)
if time.Now().After(expiry) {
return nil, fmt.Errorf("expiry must be in the future")
Expand Down Expand Up @@ -618,6 +646,10 @@ func (s *sessionRpcServer) resumeSession(ctx context.Context,
func (s *sessionRpcServer) ListSessions(ctx context.Context,
_ *litrpc.ListSessionsRequest) (*litrpc.ListSessionsResponse, error) {

if !s.started() {
return nil, ErrServerNotActive
}

sessions, err := s.cfg.db.ListAllSessions(ctx)
if err != nil {
return nil, fmt.Errorf("error fetching sessions: %v", err)
Expand All @@ -642,6 +674,10 @@ func (s *sessionRpcServer) ListSessions(ctx context.Context,
func (s *sessionRpcServer) RevokeSession(ctx context.Context,
req *litrpc.RevokeSessionRequest) (*litrpc.RevokeSessionResponse, error) {

if !s.started() {
return nil, ErrServerNotActive
}

pubKey, err := btcec.ParsePubKey(req.LocalPublicKey)
if err != nil {
return nil, fmt.Errorf("error parsing public key: %v", err)
Expand Down Expand Up @@ -676,6 +712,10 @@ func (s *sessionRpcServer) PrivacyMapConversion(ctx context.Context,
req *litrpc.PrivacyMapConversionRequest) (
*litrpc.PrivacyMapConversionResponse, error) {

if !s.started() {
return nil, ErrServerNotActive
}

var (
groupID session.ID
err error
Expand Down Expand Up @@ -733,6 +773,10 @@ func (s *sessionRpcServer) PrivacyMapConversion(ctx context.Context,
func (s *sessionRpcServer) ListActions(ctx context.Context,
req *litrpc.ListActionsRequest) (*litrpc.ListActionsResponse, error) {

if !s.started() {
return nil, ErrServerNotActive
}

// If no maximum number of actions is given, use a default of 100.
if req.MaxNumActions == 0 {
req.MaxNumActions = 100
Expand Down Expand Up @@ -841,6 +885,10 @@ func (s *sessionRpcServer) ListAutopilotFeatures(ctx context.Context,
_ *litrpc.ListAutopilotFeaturesRequest) (
*litrpc.ListAutopilotFeaturesResponse, error) {

if !s.started() {
return nil, ErrServerNotActive
}

fs, err := s.cfg.autopilot.ListFeatures(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -884,6 +932,10 @@ func (s *sessionRpcServer) AddAutopilotSession(ctx context.Context,
req *litrpc.AddAutopilotSessionRequest) (
*litrpc.AddAutopilotSessionResponse, error) {

if !s.started() {
return nil, ErrServerNotActive
}

if len(req.Features) == 0 {
return nil, fmt.Errorf("must include at least one feature")
}
Expand Down Expand Up @@ -1325,6 +1377,10 @@ func (s *sessionRpcServer) ListAutopilotSessions(ctx context.Context,
_ *litrpc.ListAutopilotSessionsRequest) (
*litrpc.ListAutopilotSessionsResponse, error) {

if !s.started() {
return nil, ErrServerNotActive
}

sessions, err := s.cfg.db.ListSessionsByType(ctx, session.TypeAutopilot)
if err != nil {
return nil, fmt.Errorf("error fetching sessions: %v", err)
Expand All @@ -1349,6 +1405,10 @@ func (s *sessionRpcServer) RevokeAutopilotSession(ctx context.Context,
req *litrpc.RevokeAutopilotSessionRequest) (
*litrpc.RevokeAutopilotSessionResponse, error) {

if !s.started() {
return nil, ErrServerNotActive
}

pubKey, err := btcec.ParsePubKey(req.LocalPublicKey)
if err != nil {
return nil, fmt.Errorf("error parsing public key: %v", err)
Expand Down
Loading
Loading