Skip to content

Introduce MPTCP #1811

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 15, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions src/anet.c
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,26 @@ static int anetV6Only(char *err, int s) {
return ANET_OK;
}

static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog) {
/* Until glibc 2.39, getaddrinfo with hints.ai_protocol of IPPROTO_MPTCP leads error.
* Use hints.ai_protocol IPPROTO_IP (0) or IPPROTO_TCP (6) to resolve address and overwrite
* it when MPTCP is enabled.
* Ref: https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/tools/testing/selftests/net/mptcp/mptcp_connect.c
*/
static int anetTcpGetProtocol(char *err, int ai_protocol, int mptcp) {
if (mptcp) {
#ifdef IPPROTO_MPTCP
UNUSED(err);
return IPPROTO_MPTCP;
}
#else
anetSetError(err, "MPTCP is not supported on this platform");
return ANET_ERR;
}
#endif
return ai_protocol;
}

static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog, int mptcp) {
int s = -1, rv;
char _port[6]; /* strlen("65535") */
struct addrinfo hints, *servinfo, *p;
Expand All @@ -591,7 +610,10 @@ static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backl
return ANET_ERR;
}
for (p = servinfo; p != NULL; p = p->ai_next) {
if ((s = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) continue;
rv = anetTcpGetProtocol(err, p->ai_protocol, mptcp);
if (rv == ANET_ERR) goto error;

if ((s = socket(p->ai_family, p->ai_socktype, rv)) == -1) continue;

if (af == AF_INET6 && anetV6Only(err, s) == ANET_ERR) goto error;
if (anetSetReuseAddr(err, s) == ANET_ERR) goto error;
Expand All @@ -611,12 +633,12 @@ static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backl
return s;
}

int anetTcpServer(char *err, int port, char *bindaddr, int backlog) {
return _anetTcpServer(err, port, bindaddr, AF_INET, backlog);
int anetTcpServer(char *err, int port, char *bindaddr, int backlog, int mptcp) {
return _anetTcpServer(err, port, bindaddr, AF_INET, backlog, mptcp);
}

int anetTcp6Server(char *err, int port, char *bindaddr, int backlog) {
return _anetTcpServer(err, port, bindaddr, AF_INET6, backlog);
int anetTcp6Server(char *err, int port, char *bindaddr, int backlog, int mptcp) {
return _anetTcpServer(err, port, bindaddr, AF_INET6, backlog, mptcp);
}

int anetUnixServer(char *err, char *path, mode_t perm, int backlog, char *group) {
Expand Down
4 changes: 2 additions & 2 deletions src/anet.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@
int anetTcpNonBlockConnect(char *err, const char *addr, int port);
int anetTcpNonBlockBestEffortBindConnect(char *err, const char *addr, int port, const char *source_addr);
int anetResolve(char *err, char *host, char *ipbuf, size_t ipbuf_len, int flags);
int anetTcpServer(char *err, int port, char *bindaddr, int backlog);
int anetTcp6Server(char *err, int port, char *bindaddr, int backlog);
int anetTcpServer(char *err, int port, char *bindaddr, int backlog, int mptcp);
int anetTcp6Server(char *err, int port, char *bindaddr, int backlog, int mptcp);
int anetUnixServer(char *err, char *path, mode_t perm, int backlog, char *group);
int anetTcpAccept(char *err, int serversock, char *ip, size_t ip_len, int *port);
int anetUnixAccept(char *err, int serversock);
Expand Down
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3276,6 +3276,7 @@ standardConfig static_configs[] = {
createIntConfig("timeout", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.maxidletime, 0, INTEGER_CONFIG, NULL, NULL), /* Default client timeout: infinite */
createIntConfig("replica-announce-port", "slave-announce-port", MODIFIABLE_CONFIG, 0, 65535, server.replica_announce_port, 0, INTEGER_CONFIG, NULL, NULL),
createIntConfig("tcp-backlog", NULL, IMMUTABLE_CONFIG, 0, INT_MAX, server.tcp_backlog, 511, INTEGER_CONFIG, NULL, NULL), /* TCP listen backlog. */
createBoolConfig("mptcp", NULL, IMMUTABLE_CONFIG, server.mptcp, 0, NULL, NULL), /* Multipath TCP. */
createIntConfig("cluster-port", NULL, IMMUTABLE_CONFIG, 0, 65535, server.cluster_port, 0, INTEGER_CONFIG, NULL, NULL),
createIntConfig("cluster-announce-bus-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.cluster_announce_bus_port, 0, INTEGER_CONFIG, NULL, updateClusterAnnouncedPort), /* Default: Use +10000 offset. */
createIntConfig("cluster-announce-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.cluster_announce_port, 0, INTEGER_CONFIG, NULL, updateClusterAnnouncedPort), /* Use server.port */
Expand Down
4 changes: 2 additions & 2 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2651,10 +2651,10 @@ int listenToPort(connListener *sfd) {
if (optional) addr++;
if (strchr(addr, ':')) {
/* Bind IPv6 address. */
sfd->fd[sfd->count] = anetTcp6Server(server.neterr, port, addr, server.tcp_backlog);
sfd->fd[sfd->count] = anetTcp6Server(server.neterr, port, addr, server.tcp_backlog, server.mptcp);
} else {
/* Bind IPv4 address. */
sfd->fd[sfd->count] = anetTcpServer(server.neterr, port, addr, server.tcp_backlog);
sfd->fd[sfd->count] = anetTcpServer(server.neterr, port, addr, server.tcp_backlog, server.mptcp);
}
if (sfd->fd[sfd->count] == ANET_ERR) {
int net_errno = errno;
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1612,6 +1612,7 @@ struct valkeyServer {
int port; /* TCP listening port */
int tls_port; /* TLS listening port */
int tcp_backlog; /* TCP listen() backlog */
int mptcp; /* Use Multipath TCP */
char *bindaddr[CONFIG_BINDADDR_MAX]; /* Addresses we should bind to */
int bindaddr_count; /* Number of addresses in server.bindaddr[] */
char *bind_source_addr; /* Source address to bind on for outgoing connections */
Expand Down
1 change: 1 addition & 0 deletions tests/unit/introspection.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,7 @@ start_server {tags {"introspection"}} {
rdbchecksum
daemonize
tcp-backlog
mptcp
always-show-logo
syslog-enabled
cluster-enabled
Expand Down
9 changes: 9 additions & 0 deletions valkey.conf
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,15 @@ port 6379
# in order to get the desired effect.
tcp-backlog 511

# Multipath TCP
#
# Multipath TCP (MPTCP) allows a single transport connection to use multiple network
# interfaces or paths. MPTCP is useful for applications like bandwidth aggregation,
# failover, and more resilient connections.
# Note that MPTCP is supported in the official Linux kernel starting with version 5.6.
#
# mptcp yes

# Unix socket.
#
# Specify the path for the Unix socket that will be used to listen for
Expand Down
Loading