Skip to content

Commit cbfb11f

Browse files
committed
session: add session ID to grpc metadata via context
Add grpc interceptors that inject an LNC session's ID into the context as gRPC metadata. By injecting it as such, it will be transported over the wire in any outgoing gRPC calls. This lets us be sure that any session call sent to the RPCMiddleware interceptor in LND will continue to be grouped along with the appropriate session ID. This gives LND a way to send the metadata we include back to LiT meaning that we will later on be able to extract the session ID again.
1 parent 9018cd4 commit cbfb11f

File tree

3 files changed

+132
-4
lines changed

3 files changed

+132
-4
lines changed

session/context.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package session
2+
3+
import (
4+
"encoding/hex"
5+
"fmt"
6+
7+
"github.com/lightningnetwork/lnd/fn"
8+
"google.golang.org/grpc/metadata"
9+
)
10+
11+
// contextKey is a struct that is used as a key for storing session IDs
12+
// in a context. Using this unexported type prevents collisions with other
13+
// context keys that may be used in the same context. However, this only
14+
// applies if the context is passed around in the same binary and not if the
15+
// value is converted to grpc metadata and sent over the wire. In that case,
16+
// we need to use a string key to avoid collisions with other metadata keys.
17+
type contextKey struct {
18+
name string
19+
}
20+
21+
// sessionIDCtxKey is the context key used to store the session ID in
22+
// a context. The key is a string to avoid collisions with other context values
23+
// that may also be included in grpc metadata which is why we add the 'lit'
24+
// prefix.
25+
var sessionIDCtxKey = contextKey{"lit_session_id"}
26+
27+
// FromGRPCMetadata extracts the session ID from the given gRPC metadata kv
28+
// pairs if one is found.
29+
func FromGRPCMetadata(md metadata.MD) (fn.Option[ID], error) {
30+
val := md.Get(sessionIDCtxKey.name)
31+
if len(val) == 0 {
32+
return fn.None[ID](), nil
33+
}
34+
35+
if len(val) != 1 {
36+
return fn.None[ID](), fmt.Errorf("more than one session ID "+
37+
"found in gRPC metadata: %v", val)
38+
}
39+
40+
b, err := hex.DecodeString(val[0])
41+
if err != nil {
42+
return fn.None[ID](), err
43+
}
44+
45+
sessID, err := IDFromBytes(b)
46+
if err != nil {
47+
return fn.None[ID](), err
48+
}
49+
50+
return fn.Some(sessID), nil
51+
}
52+
53+
// AddToGRPCMetadata adds the session ID to the given gRPC metadata kv pairs.
54+
// The session ID is encoded as a hex string.
55+
func AddToGRPCMetadata(md metadata.MD, id ID) {
56+
md.Set(sessionIDCtxKey.name, hex.EncodeToString(id[:]))
57+
}

session/server.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ import (
1818

1919
type sessionID [33]byte
2020

21-
type GRPCServerCreator func(opts ...grpc.ServerOption) *grpc.Server
21+
type GRPCServerCreator func(sessionID ID,
22+
opts ...grpc.ServerOption) *grpc.Server
2223

2324
type mailboxSession struct {
2425
server *grpc.Server
@@ -70,7 +71,7 @@ func (m *mailboxSession) start(session *Session,
7071
}
7172

7273
noiseConn := mailbox.NewNoiseGrpcConn(keys)
73-
m.server = serverCreator(grpc.Creds(noiseConn))
74+
m.server = serverCreator(session.ID, grpc.Creds(noiseConn))
7475

7576
m.wg.Add(1)
7677
go m.run(mailboxServer)

session_rpcserver.go

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/lightningnetwork/lnd/fn"
2727
"github.com/lightningnetwork/lnd/macaroons"
2828
"google.golang.org/grpc"
29+
"google.golang.org/grpc/metadata"
2930
"gopkg.in/macaroon-bakery.v2/bakery"
3031
"gopkg.in/macaroon-bakery.v2/bakery/checkers"
3132
"gopkg.in/macaroon.v2"
@@ -77,10 +78,23 @@ func newSessionRPCServer(cfg *sessionRpcServerConfig) (*sessionRpcServer,
7778
// actual mailbox server that spins up the Terminal Connect server
7879
// interface.
7980
server := session.NewServer(
80-
func(opts ...grpc.ServerOption) *grpc.Server {
81-
allOpts := append(cfg.grpcOptions, opts...)
81+
func(id session.ID, opts ...grpc.ServerOption) *grpc.Server {
82+
// Add the session ID injector interceptors first so
83+
// that the session ID is available in the context of
84+
// all interceptors that come after.
85+
allOpts := []grpc.ServerOption{
86+
addSessionIDToStreamCtx(id),
87+
addSessionIDToUnaryCtx(id),
88+
}
89+
90+
allOpts = append(allOpts, cfg.grpcOptions...)
91+
allOpts = append(allOpts, opts...)
92+
93+
// Construct the gRPC server with the options.
8294
grpcServer := grpc.NewServer(allOpts...)
8395

96+
// Register various grpc servers with the LNC session
97+
// server.
8498
cfg.registerGrpcServers(grpcServer)
8599

86100
return grpcServer
@@ -94,6 +108,62 @@ func newSessionRPCServer(cfg *sessionRpcServerConfig) (*sessionRpcServer,
94108
}, nil
95109
}
96110

111+
// wrappedServerStream is a wrapper around the grpc.ServerStream that allows us
112+
// to set a custom context. This is needed since the stream handler function
113+
// doesn't take a context as an argument, but rather has a Context method on the
114+
// handler itself. So we use this custom wrapper to override this method.
115+
type wrappedServerStream struct {
116+
grpc.ServerStream
117+
ctx context.Context
118+
}
119+
120+
// Context returns the context of the stream.
121+
//
122+
// NOTE: This implements the grpc.ServerStream Context method.
123+
func (w *wrappedServerStream) Context() context.Context {
124+
return w.ctx
125+
}
126+
127+
// addSessionIDToStreamCtx is a gRPC stream interceptor that adds the given
128+
// session ID to the context of the stream. This allows us to access the
129+
// session ID later on for any gRPC calls made through this stream.
130+
func addSessionIDToStreamCtx(id session.ID) grpc.ServerOption {
131+
return grpc.StreamInterceptor(func(srv any, ss grpc.ServerStream,
132+
info *grpc.StreamServerInfo,
133+
handler grpc.StreamHandler) error {
134+
135+
md, _ := metadata.FromIncomingContext(ss.Context())
136+
mdCopy := md.Copy()
137+
session.AddToGRPCMetadata(mdCopy, id)
138+
139+
// Wrap the original stream with our custom context.
140+
wrapped := &wrappedServerStream{
141+
ServerStream: ss,
142+
ctx: metadata.NewIncomingContext(
143+
ss.Context(), mdCopy,
144+
),
145+
}
146+
147+
return handler(srv, wrapped)
148+
})
149+
}
150+
151+
// addSessionIDToUnaryCtx is a gRPC unary interceptor that adds the given
152+
// session ID to the context of the unary call. This allows us to access the
153+
// session ID later on for any gRPC calls made through this context.
154+
func addSessionIDToUnaryCtx(id session.ID) grpc.ServerOption {
155+
return grpc.UnaryInterceptor(func(ctx context.Context, req any,
156+
info *grpc.UnaryServerInfo,
157+
handler grpc.UnaryHandler) (resp any, err error) {
158+
159+
md, _ := metadata.FromIncomingContext(ctx)
160+
mdCopy := md.Copy()
161+
session.AddToGRPCMetadata(mdCopy, id)
162+
163+
return handler(metadata.NewIncomingContext(ctx, mdCopy), req)
164+
})
165+
}
166+
97167
// start all the components necessary for the sessionRpcServer to start serving
98168
// requests. This includes resuming all non-revoked sessions.
99169
func (s *sessionRpcServer) start(ctx context.Context) error {

0 commit comments

Comments
 (0)