Skip to content

Commit f86953c

Browse files
Implement ECMP for unsafe_routes (#1332)
1 parent 3de36c9 commit f86953c

21 files changed

+689
-81
lines changed

Diff for: examples/config.yml

+22-1
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,28 @@ tun:
239239

240240
# Unsafe routes allows you to route traffic over nebula to non-nebula nodes
241241
# Unsafe routes should be avoided unless you have hosts/services that cannot run nebula
242-
# NOTE: The nebula certificate of the "via" node *MUST* have the "route" defined as a subnet in its certificate
242+
# Supports weighted ECMP if you define a list of gateways, this can be used for load balancing or redundancy to hosts outside of nebula
243+
# NOTES:
244+
# * You will only see a single gateway in the routing table if you are not on linux
245+
# * If a gateway is not reachable through the overlay another gateway will be selected to send the traffic through, ignoring weights
246+
#
247+
# unsafe_routes:
248+
# # Multiple gateways without defining a weight defaults to a weight of 1, this will balance traffic equally between the three gateways
249+
# - route: 192.168.87.0/24
250+
# via:
251+
# - gateway: 10.0.0.1
252+
# - gateway: 10.0.0.2
253+
# - gateway: 10.0.0.3
254+
# # Multiple gateways with a weight, this will balance traffic accordingly
255+
# - route: 192.168.87.0/24
256+
# via:
257+
# - gateway: 10.0.0.1
258+
# weight: 10
259+
# - gateway: 10.0.0.2
260+
# weight: 5
261+
#
262+
# NOTE: The nebula certificate of the "via" node(s) *MUST* have the "route" defined as a subnet in its certificate
263+
# `via`: single node or list of gateways to use for this route
243264
# `mtu`: will default to tun mtu if this option is not specified
244265
# `metric`: will default to 0 if this option is not specified
245266
# `install`: will default to true, controls whether this route is installed in the systems routing table.

Diff for: inside.go

+83-10
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/slackhq/nebula/header"
99
"github.com/slackhq/nebula/iputil"
1010
"github.com/slackhq/nebula/noiseutil"
11+
"github.com/slackhq/nebula/routing"
1112
)
1213

1314
func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *firewall.Packet, nb, out []byte, q int, localCache firewall.ConntrackCache) {
@@ -49,7 +50,7 @@ func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *firewall.Packet
4950
return
5051
}
5152

52-
hostinfo, ready := f.getOrHandshake(fwPacket.RemoteAddr, func(hh *HandshakeHostInfo) {
53+
hostinfo, ready := f.getOrHandshakeConsiderRouting(fwPacket, func(hh *HandshakeHostInfo) {
5354
hh.cachePacket(f.l, header.Message, 0, packet, f.sendMessageNow, f.cachedPacketMetrics)
5455
})
5556

@@ -121,22 +122,94 @@ func (f *Interface) rejectOutside(packet []byte, ci *ConnectionState, hostinfo *
121122
f.sendNoMetrics(header.Message, 0, ci, hostinfo, netip.AddrPort{}, out, nb, packet, q)
122123
}
123124

125+
// Handshake will attempt to initiate a tunnel with the provided vpn address if it is within our vpn networks. This is a no-op if the tunnel is already established or being established
124126
func (f *Interface) Handshake(vpnAddr netip.Addr) {
125-
f.getOrHandshake(vpnAddr, nil)
127+
f.getOrHandshakeNoRouting(vpnAddr, nil)
126128
}
127129

128-
// getOrHandshake returns nil if the vpnAddr is not routable.
130+
// getOrHandshakeNoRouting returns nil if the vpnAddr is not routable.
129131
// If the 2nd return var is false then the hostinfo is not ready to be used in a tunnel
130-
func (f *Interface) getOrHandshake(vpnAddr netip.Addr, cacheCallback func(*HandshakeHostInfo)) (*HostInfo, bool) {
132+
func (f *Interface) getOrHandshakeNoRouting(vpnAddr netip.Addr, cacheCallback func(*HandshakeHostInfo)) (*HostInfo, bool) {
131133
_, found := f.myVpnNetworksTable.Lookup(vpnAddr)
132-
if !found {
133-
vpnAddr = f.inside.RouteFor(vpnAddr)
134-
if !vpnAddr.IsValid() {
135-
return nil, false
134+
if found {
135+
return f.handshakeManager.GetOrHandshake(vpnAddr, cacheCallback)
136+
}
137+
138+
return nil, false
139+
}
140+
141+
// getOrHandshakeConsiderRouting will try to find the HostInfo to handle this packet, starting a handshake if necessary.
142+
// If the 2nd return var is false then the hostinfo is not ready to be used in a tunnel.
143+
func (f *Interface) getOrHandshakeConsiderRouting(fwPacket *firewall.Packet, cacheCallback func(*HandshakeHostInfo)) (*HostInfo, bool) {
144+
145+
destinationAddr := fwPacket.RemoteAddr
146+
147+
hostinfo, ready := f.getOrHandshakeNoRouting(destinationAddr, cacheCallback)
148+
149+
// Host is inside the mesh, no routing required
150+
if hostinfo != nil {
151+
return hostinfo, ready
152+
}
153+
154+
gateways := f.inside.RoutesFor(destinationAddr)
155+
156+
switch len(gateways) {
157+
case 0:
158+
return nil, false
159+
case 1:
160+
// Single gateway route
161+
return f.handshakeManager.GetOrHandshake(gateways[0].Addr(), cacheCallback)
162+
default:
163+
// Multi gateway route, perform ECMP categorization
164+
gatewayAddr, balancingOk := routing.BalancePacket(fwPacket, gateways)
165+
166+
if !balancingOk {
167+
// This happens if the gateway buckets were not calculated, this _should_ never happen
168+
f.l.Error("Gateway buckets not calculated, fallback from ECMP to random routing. Please report this bug.")
136169
}
170+
171+
var handshakeInfoForChosenGateway *HandshakeHostInfo
172+
var hhReceiver = func(hh *HandshakeHostInfo) {
173+
handshakeInfoForChosenGateway = hh
174+
}
175+
176+
// Store the handshakeHostInfo for later.
177+
// If this node is not reachable we will attempt other nodes, if none are reachable we will
178+
// cache the packet for this gateway.
179+
if hostinfo, ready = f.handshakeManager.GetOrHandshake(gatewayAddr, hhReceiver); ready {
180+
return hostinfo, true
181+
}
182+
183+
// It appears the selected gateway cannot be reached, find another gateway to fallback on.
184+
// The current implementation breaks ECMP but that seems better than no connectivity.
185+
// If ECMP is also required when a gateway is down then connectivity status
186+
// for each gateway needs to be kept and the weights recalculated when they go up or down.
187+
// This would also need to interact with unsafe_route updates through reloading the config or
188+
// use of the use_system_route_table option
189+
190+
if f.l.Level >= logrus.DebugLevel {
191+
f.l.WithField("destination", destinationAddr).
192+
WithField("originalGateway", gatewayAddr).
193+
Debugln("Calculated gateway for ECMP not available, attempting other gateways")
194+
}
195+
196+
for i := range gateways {
197+
// Skip the gateway that failed previously
198+
if gateways[i].Addr() == gatewayAddr {
199+
continue
200+
}
201+
202+
// We do not need the HandshakeHostInfo since we cache the packet in the originally chosen gateway
203+
if hostinfo, ready = f.handshakeManager.GetOrHandshake(gateways[i].Addr(), nil); ready {
204+
return hostinfo, true
205+
}
206+
}
207+
208+
// No gateways reachable, cache the packet in the originally chosen gateway
209+
cacheCallback(handshakeInfoForChosenGateway)
210+
return hostinfo, false
137211
}
138212

139-
return f.handshakeManager.GetOrHandshake(vpnAddr, cacheCallback)
140213
}
141214

142215
func (f *Interface) sendMessageNow(t header.MessageType, st header.MessageSubType, hostinfo *HostInfo, p, nb, out []byte) {
@@ -163,7 +236,7 @@ func (f *Interface) sendMessageNow(t header.MessageType, st header.MessageSubTyp
163236

164237
// SendMessageToVpnAddr handles real addr:port lookup and sends to the current best known address for vpnAddr
165238
func (f *Interface) SendMessageToVpnAddr(t header.MessageType, st header.MessageSubType, vpnAddr netip.Addr, p, nb, out []byte) {
166-
hostInfo, ready := f.getOrHandshake(vpnAddr, func(hh *HandshakeHostInfo) {
239+
hostInfo, ready := f.getOrHandshakeNoRouting(vpnAddr, func(hh *HandshakeHostInfo) {
167240
hh.cachePacket(f.l, t, st, p, f.SendMessageToHostInfo, f.cachedPacketMetrics)
168241
})
169242

Diff for: overlay/device.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@ package overlay
33
import (
44
"io"
55
"net/netip"
6+
7+
"github.com/slackhq/nebula/routing"
68
)
79

810
type Device interface {
911
io.ReadWriteCloser
1012
Activate() error
1113
Networks() []netip.Prefix
1214
Name() string
13-
RouteFor(netip.Addr) netip.Addr
15+
RoutesFor(netip.Addr) routing.Gateways
1416
NewMultiQueueReader() (io.ReadWriteCloser, error)
1517
}

Diff for: overlay/route.go

+65-13
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,14 @@ import (
1111
"github.com/gaissmai/bart"
1212
"github.com/sirupsen/logrus"
1313
"github.com/slackhq/nebula/config"
14+
"github.com/slackhq/nebula/routing"
1415
)
1516

1617
type Route struct {
1718
MTU int
1819
Metric int
1920
Cidr netip.Prefix
20-
Via netip.Addr
21+
Via routing.Gateways
2122
Install bool
2223
}
2324

@@ -47,15 +48,17 @@ func (r Route) String() string {
4748
return s
4849
}
4950

50-
func makeRouteTree(l *logrus.Logger, routes []Route, allowMTU bool) (*bart.Table[netip.Addr], error) {
51-
routeTree := new(bart.Table[netip.Addr])
51+
func makeRouteTree(l *logrus.Logger, routes []Route, allowMTU bool) (*bart.Table[routing.Gateways], error) {
52+
routeTree := new(bart.Table[routing.Gateways])
5253
for _, r := range routes {
5354
if !allowMTU && r.MTU > 0 {
5455
l.WithField("route", r).Warnf("route MTU is not supported in %s", runtime.GOOS)
5556
}
5657

57-
if r.Via.IsValid() {
58-
routeTree.Insert(r.Cidr, r.Via)
58+
gateways := r.Via
59+
if len(gateways) > 0 {
60+
routing.CalculateBucketsForGateways(gateways)
61+
routeTree.Insert(r.Cidr, gateways)
5962
}
6063
}
6164
return routeTree, nil
@@ -201,14 +204,63 @@ func parseUnsafeRoutes(c *config.C, networks []netip.Prefix) ([]Route, error) {
201204
return nil, fmt.Errorf("entry %v.via in tun.unsafe_routes is not present", i+1)
202205
}
203206

204-
via, ok := rVia.(string)
205-
if !ok {
206-
return nil, fmt.Errorf("entry %v.via in tun.unsafe_routes is not a string: found %T", i+1, rVia)
207-
}
207+
var gateways routing.Gateways
208208

209-
viaVpnIp, err := netip.ParseAddr(via)
210-
if err != nil {
211-
return nil, fmt.Errorf("entry %v.via in tun.unsafe_routes failed to parse address: %v", i+1, err)
209+
switch via := rVia.(type) {
210+
case string:
211+
viaIp, err := netip.ParseAddr(via)
212+
if err != nil {
213+
return nil, fmt.Errorf("entry %v.via in tun.unsafe_routes failed to parse address: %v", i+1, err)
214+
}
215+
216+
gateways = routing.Gateways{routing.NewGateway(viaIp, 1)}
217+
218+
case []interface{}:
219+
gateways = make(routing.Gateways, len(via))
220+
for ig, v := range via {
221+
gatewayMap, ok := v.(map[interface{}]interface{})
222+
if !ok {
223+
return nil, fmt.Errorf("entry %v in tun.unsafe_routes[%v].via is invalid", i+1, ig+1)
224+
}
225+
226+
rGateway, ok := gatewayMap["gateway"]
227+
if !ok {
228+
return nil, fmt.Errorf("entry .gateway in tun.unsafe_routes[%v].via[%v] is not present", i+1, ig+1)
229+
}
230+
231+
parsedGateway, ok := rGateway.(string)
232+
if !ok {
233+
return nil, fmt.Errorf("entry .gateway in tun.unsafe_routes[%v].via[%v] is not a string", i+1, ig+1)
234+
}
235+
236+
gatewayIp, err := netip.ParseAddr(parsedGateway)
237+
if err != nil {
238+
return nil, fmt.Errorf("entry .gateway in tun.unsafe_routes[%v].via[%v] failed to parse address: %v", i+1, ig+1, err)
239+
}
240+
241+
rGatewayWeight, ok := gatewayMap["weight"]
242+
if !ok {
243+
rGatewayWeight = 1
244+
}
245+
246+
gatewayWeight, ok := rGatewayWeight.(int)
247+
if !ok {
248+
_, err = strconv.ParseInt(rGatewayWeight.(string), 10, 32)
249+
if err != nil {
250+
return nil, fmt.Errorf("entry .weight in tun.unsafe_routes[%v].via[%v] is not an integer", i+1, ig+1)
251+
}
252+
}
253+
254+
if gatewayWeight < 1 || gatewayWeight > math.MaxInt32 {
255+
return nil, fmt.Errorf("entry .weight in tun.unsafe_routes[%v].via[%v] is not in range (1-%d) : %v", i+1, ig+1, math.MaxInt32, gatewayWeight)
256+
}
257+
258+
gateways[ig] = routing.NewGateway(gatewayIp, gatewayWeight)
259+
260+
}
261+
262+
default:
263+
return nil, fmt.Errorf("entry %v.via in tun.unsafe_routes is not a string or list of gateways: found %T", i+1, rVia)
212264
}
213265

214266
rRoute, ok := m["route"]
@@ -226,7 +278,7 @@ func parseUnsafeRoutes(c *config.C, networks []netip.Prefix) ([]Route, error) {
226278
}
227279

228280
r := Route{
229-
Via: viaVpnIp,
281+
Via: gateways,
230282
MTU: mtu,
231283
Metric: metric,
232284
Install: install,

0 commit comments

Comments
 (0)