Skip to content

Commit 7ed06ec

Browse files
committed
Add ENOBUFS handling for unsolicited messages
This can happen when large burst of messages come all of a sudden, which happen very easily when routing protocols are involved (e.g. BGP). The current implementation incorrectly assumes that any failure to read from the socket is akin to the socket closed. This is not the case. This adds handling for this specific error, which translates to a wrapper struct in the unsolicited messages stream: either a message, or an overrun. This lets applications handle best for their usecase such event: either resync because messages are lost, or do nothing if the listening is informational only (e.g. logging).
1 parent 6c73fd1 commit 7ed06ec

File tree

5 files changed

+101
-37
lines changed

5 files changed

+101
-37
lines changed

examples/audit_netlink_events.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use std::process;
2929

3030
use netlink_proto::{
3131
new_connection,
32+
packet::NetlinkEvent,
3233
sys::{protocols::NETLINK_AUDIT, SocketAddr},
3334
};
3435

@@ -45,10 +46,10 @@ async fn main() -> Result<(), String> {
4546
// - `handle` is a `Handle` to the `Connection`. We use it to send netlink
4647
// messages and receive responses to these messages.
4748
//
48-
// - `messages` is a channel receiver through which we receive messages that we
49+
// - `events` is a channel receiver through which we receive messages that we
4950
// have not sollicated, ie that are not response to a request we made. In this
5051
// example, we'll receive the audit event through that channel.
51-
let (conn, mut handle, mut messages) = new_connection(NETLINK_AUDIT)
52+
let (conn, mut handle, mut events) = new_connection(NETLINK_AUDIT)
5253
.map_err(|e| format!("Failed to create a new netlink connection: {}", e))?;
5354

5455
// Spawn the `Connection` so that it starts polling the netlink
@@ -85,13 +86,20 @@ async fn main() -> Result<(), String> {
8586
}
8687
});
8788

88-
// Finally, start receiving event through the `messages` channel.
89+
// Finally, start receiving event through the `events` channel.
8990
println!("Starting to print audit events... press ^C to interrupt");
90-
while let Some((message, _addr)) = messages.next().await {
91-
if let NetlinkPayload::Error(err_message) = message.payload {
92-
eprintln!("received an error message: {:?}", err_message);
93-
} else {
94-
println!("{:?}", message);
91+
while let Some(event) = events.next().await {
92+
match event {
93+
NetlinkEvent::Message((message, _addr)) => {
94+
if let NetlinkPayload::Error(err_message) = message.payload {
95+
eprintln!("received an error message: {:?}", err_message);
96+
} else {
97+
println!("{:?}", message);
98+
}
99+
}
100+
NetlinkEvent::Overrun => {
101+
println!("Netlink socket overrun. Some messages were lost");
102+
}
95103
}
96104
}
97105

src/connection.rs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use futures::{
1313
};
1414
use log::{error, warn};
1515
use netlink_packet_core::{
16-
NetlinkDeserializable, NetlinkMessage, NetlinkPayload, NetlinkSerializable,
16+
NetlinkDeserializable, NetlinkEvent, NetlinkMessage, NetlinkPayload, NetlinkSerializable,
1717
};
1818

1919
use crate::{
@@ -46,7 +46,7 @@ where
4646

4747
/// Channel used to transmit to the ConnectionHandle the unsolicited
4848
/// messages received from the socket (multicast messages for instance).
49-
unsolicited_messages_tx: Option<UnboundedSender<(NetlinkMessage<T>, SocketAddr)>>,
49+
unsolicited_messages_tx: Option<UnboundedSender<NetlinkEvent<(NetlinkMessage<T>, SocketAddr)>>>,
5050

5151
socket_closed: bool,
5252
}
@@ -59,7 +59,7 @@ where
5959
{
6060
pub(crate) fn new(
6161
requests_rx: UnboundedReceiver<Request<T>>,
62-
unsolicited_messages_tx: UnboundedSender<(NetlinkMessage<T>, SocketAddr)>,
62+
unsolicited_messages_tx: UnboundedSender<NetlinkEvent<(NetlinkMessage<T>, SocketAddr)>>,
6363
protocol: isize,
6464
) -> io::Result<Self> {
6565
let socket = S::new(protocol)?;
@@ -125,10 +125,14 @@ where
125125
loop {
126126
trace!("polling socket");
127127
match socket.as_mut().poll_next(cx) {
128-
Poll::Ready(Some((message, addr))) => {
128+
Poll::Ready(Some(NetlinkEvent::Message((message, addr)))) => {
129129
trace!("read datagram from socket");
130130
self.protocol.handle_message(message, addr);
131131
}
132+
Poll::Ready(Some(NetlinkEvent::Overrun)) => {
133+
warn!("netlink socket buffer full");
134+
self.protocol.handle_buffer_full();
135+
}
132136
Poll::Ready(None) => {
133137
warn!("netlink socket stream shut down");
134138
self.socket_closed = true;
@@ -159,11 +163,13 @@ where
159163

160164
pub fn forward_unsolicited_messages(&mut self) {
161165
if self.unsolicited_messages_tx.is_none() {
162-
while let Some((message, source)) = self.protocol.incoming_requests.pop_front() {
163-
warn!(
164-
"ignoring unsolicited message {:?} from {:?}",
165-
message, source
166-
);
166+
while let Some(event) = self.protocol.incoming_requests.pop_front() {
167+
match event {
168+
NetlinkEvent::Message((message, source)) => {
169+
warn!("ignoring unsolicited message {message:?} from {source:?}")
170+
}
171+
NetlinkEvent::Overrun => warn!("ignoring unsolicited socket overrun"),
172+
}
167173
}
168174
return;
169175
}
@@ -177,11 +183,11 @@ where
177183
..
178184
} = self;
179185

180-
while let Some((message, source)) = protocol.incoming_requests.pop_front() {
186+
while let Some(event) = protocol.incoming_requests.pop_front() {
181187
if unsolicited_messages_tx
182188
.as_mut()
183189
.unwrap()
184-
.unbounded_send((message, source))
190+
.unbounded_send(event)
185191
.is_err()
186192
{
187193
// The channel is unbounded so the only error that can

src/framed.rs

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,12 @@ use crate::{
1616
codecs::NetlinkMessageCodec,
1717
sys::{AsyncSocket, SocketAddr},
1818
};
19-
use netlink_packet_core::{NetlinkDeserializable, NetlinkMessage, NetlinkSerializable};
19+
use netlink_packet_core::{
20+
NetlinkDeserializable, NetlinkEvent, NetlinkMessage, NetlinkSerializable,
21+
};
22+
23+
/// Buffer overrun condition
24+
const ENOBUFS: i32 = 105;
2025

2126
pub struct NetlinkFramed<T, S, C> {
2227
socket: S,
@@ -38,7 +43,7 @@ where
3843
S: AsyncSocket,
3944
C: NetlinkMessageCodec,
4045
{
41-
type Item = (NetlinkMessage<T>, SocketAddr);
46+
type Item = NetlinkEvent<(NetlinkMessage<T>, SocketAddr)>;
4247

4348
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
4449
let Self {
@@ -50,7 +55,9 @@ where
5055

5156
loop {
5257
match C::decode::<T>(reader) {
53-
Ok(Some(item)) => return Poll::Ready(Some((item, *in_addr))),
58+
Ok(Some(item)) => {
59+
return Poll::Ready(Some(NetlinkEvent::Message((item, *in_addr))))
60+
}
5461
Ok(None) => {}
5562
Err(e) => {
5663
error!("unrecoverable error in decoder: {:?}", e);
@@ -63,6 +70,23 @@ where
6370

6471
*in_addr = match ready!(socket.poll_recv_from(cx, reader)) {
6572
Ok(addr) => addr,
73+
// When receiving messages in multicast mode (i.e. we subscribed to
74+
// notifications), the kernel will not wait for us to read datagrams before
75+
// sending more. The receive buffer has a finite size, so once it is full (no
76+
// more message can fit in), new messages will be dropped and recv calls will
77+
// return `ENOBUFS`.
78+
// This needs to be handled for applications to resynchronize with the contents
79+
// of the kernel if necessary.
80+
// We don't need to do anything special:
81+
// - contents of the reader is still valid because we won't have partial messages
82+
// in there anyways (large enough buffer)
83+
// - contents of the socket's internal buffer is still valid because the kernel
84+
// won't put partial data in it
85+
Err(e) if e.raw_os_error() == Some(ENOBUFS) => {
86+
// ENOBUFS
87+
warn!("netlink socket buffer full");
88+
return Poll::Ready(Some(NetlinkEvent::Overrun));
89+
}
6690
Err(e) => {
6791
error!("failed to read from netlink socket: {:?}", e);
6892
return Poll::Ready(None);

src/lib.rs

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
//! use futures::stream::StreamExt;
1818
//! use netlink_packet_audit::{
1919
//! AuditMessage,
20+
//! NetlinkEvent,
2021
//! NetlinkMessage,
2122
//! NetlinkPayload,
2223
//! StatusMessage,
@@ -44,11 +45,11 @@
4445
//! // - `handle` is a `Handle` to the `Connection`. We use it to send
4546
//! // netlink messages and receive responses to these messages.
4647
//! //
47-
//! // - `messages` is a channel receiver through which we receive
48+
//! // - `events` is a channel receiver through which we receive
4849
//! // messages that we have not solicited, ie that are not
4950
//! // response to a request we made. In this example, we'll receive
5051
//! // the audit event through that channel.
51-
//! let (conn, mut handle, mut messages) = new_connection(NETLINK_AUDIT)
52+
//! let (conn, mut handle, mut events) = new_connection(NETLINK_AUDIT)
5253
//! .map_err(|e| format!("Failed to create a new netlink connection: {}", e))?;
5354
//!
5455
//! // Spawn the `Connection` so that it starts polling the netlink
@@ -85,13 +86,23 @@
8586
//! }
8687
//! });
8788
//!
88-
//! // Finally, start receiving event through the `messages` channel.
89+
//! // Finally, start receiving event through the `events` channel.
8990
//! println!("Starting to print audit events... press ^C to interrupt");
90-
//! while let Some((message, _addr)) = messages.next().await {
91-
//! if let NetlinkPayload::Error(err_message) = message.payload {
92-
//! eprintln!("received an error message: {:?}", err_message);
93-
//! } else {
94-
//! println!("{:?}", message);
91+
//! while let Some(event) = events.next().await {
92+
//! match event {
93+
//! NetlinkEvent::Message((message, _addr)) => {
94+
//! if let NetlinkPayload::Error(err_message) = message.payload {
95+
//! eprintln!("received an error message: {:?}", err_message);
96+
//! } else {
97+
//! println!("{:?}", message);
98+
//! }
99+
//! }
100+
//! // Netlink sockets have a finite receive buffer that can fill up if there are more
101+
//! // messages sent by the kernel than we can read.
102+
//! // In this case at least one message has been lost.
103+
//! NetlinkEvent::Overrun => {
104+
//! println!("Netlink socket overrun. Some messages were lost");
105+
//! }
95106
//! }
96107
//! }
97108
//!
@@ -229,7 +240,9 @@ pub fn new_connection<T>(
229240
) -> io::Result<(
230241
Connection<T>,
231242
ConnectionHandle<T>,
232-
UnboundedReceiver<(packet::NetlinkMessage<T>, sys::SocketAddr)>,
243+
UnboundedReceiver<
244+
packet::NetlinkEvent<(packet::NetlinkMessage<T>, sys::SocketAddr)>,
245+
>,
233246
)>
234247
where
235248
T: Debug + packet::NetlinkSerializable + packet::NetlinkDeserializable + Unpin,
@@ -245,7 +258,9 @@ pub fn new_connection_with_socket<T, S>(
245258
) -> io::Result<(
246259
Connection<T, S>,
247260
ConnectionHandle<T>,
248-
UnboundedReceiver<(packet::NetlinkMessage<T>, sys::SocketAddr)>,
261+
UnboundedReceiver<
262+
packet::NetlinkEvent<(packet::NetlinkMessage<T>, sys::SocketAddr)>,
263+
>,
249264
)>
250265
where
251266
T: Debug + packet::NetlinkSerializable + packet::NetlinkDeserializable + Unpin,
@@ -262,15 +277,19 @@ pub fn new_connection_with_codec<T, S, C>(
262277
) -> io::Result<(
263278
Connection<T, S, C>,
264279
ConnectionHandle<T>,
265-
UnboundedReceiver<(packet::NetlinkMessage<T>, sys::SocketAddr)>,
280+
UnboundedReceiver<
281+
packet::NetlinkEvent<(packet::NetlinkMessage<T>, sys::SocketAddr)>,
282+
>,
266283
)>
267284
where
268285
T: Debug + packet::NetlinkSerializable + packet::NetlinkDeserializable + Unpin,
269286
S: sys::AsyncSocket,
270287
C: NetlinkMessageCodec,
271288
{
272289
let (requests_tx, requests_rx) = unbounded::<Request<T>>();
273-
let (messages_tx, messages_rx) = unbounded::<(packet::NetlinkMessage<T>, sys::SocketAddr)>();
290+
let (messages_tx, messages_rx) = unbounded::<
291+
packet::NetlinkEvent<(packet::NetlinkMessage<T>, sys::SocketAddr)>,
292+
>();
274293
Ok((
275294
Connection::new(requests_rx, messages_tx, protocol)?,
276295
ConnectionHandle::new(requests_tx),

src/protocol/protocol.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ use std::{
66
};
77

88
use netlink_packet_core::{
9-
constants::*, NetlinkDeserializable, NetlinkMessage, NetlinkPayload, NetlinkSerializable,
9+
constants::*, NetlinkDeserializable, NetlinkEvent, NetlinkMessage,
10+
NetlinkPayload, NetlinkSerializable,
1011
};
1112

1213
use super::Request;
@@ -53,7 +54,8 @@ pub(crate) struct Protocol<T, M> {
5354
pub incoming_responses: VecDeque<Response<T, M>>,
5455

5556
/// Requests from remote peers
56-
pub incoming_requests: VecDeque<(NetlinkMessage<T>, SocketAddr)>,
57+
pub incoming_requests:
58+
VecDeque<NetlinkEvent<(NetlinkMessage<T>, SocketAddr)>>,
5759

5860
/// The messages to be sent out
5961
pub outgoing_messages: VecDeque<(NetlinkMessage<T>, SocketAddr)>,
@@ -80,10 +82,15 @@ where
8082
if let hash_map::Entry::Occupied(entry) = self.pending_requests.entry(request_id) {
8183
Self::handle_response(&mut self.incoming_responses, entry, message);
8284
} else {
83-
self.incoming_requests.push_back((message, source));
85+
self.incoming_requests
86+
.push_back(NetlinkEvent::Message((message, source)));
8487
}
8588
}
8689

90+
pub fn handle_buffer_full(&mut self) {
91+
self.incoming_requests.push_back(NetlinkEvent::Overrun);
92+
}
93+
8794
fn handle_response(
8895
incoming_responses: &mut VecDeque<Response<T, M>>,
8996
entry: hash_map::OccupiedEntry<RequestId, PendingRequest<M>>,

0 commit comments

Comments
 (0)