Skip to content

Commit 2983de8

Browse files
askaltDifferentialOrange
authored andcommitted
conn: create from socket fd
This patch adds the ability to create Tarantool connection using an existing socket fd. To achieve this, several changes have been made to work with non-blocking sockets, as `socket.socketpair` creates such [1]. The authentication [2] might have already occured when we establish such a connection. If that's the case, there is no need to pass 'user' argument. On success, connect takes ownership of the `socket_fd`. 1. tarantool/tarantool#8944 2. https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/authentication/ Closes #304
1 parent b0833e7 commit 2983de8

11 files changed

+394
-25
lines changed

CHANGELOG.md

+5
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.
44
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66

7+
## Unreleased
8+
9+
### Added
10+
- The ability to connect to the Tarantool using an existing socket fd (#304).
11+
712
## 1.1.2 - 2023-09-20
813

914
### Fixed

tarantool/__init__.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
__version__ = '0.0.0-dev'
5252

5353

54-
def connect(host="localhost", port=33013, user=None, password=None,
54+
def connect(host="localhost", port=33013, socket_fd=None, user=None, password=None,
5555
encoding=ENCODING_DEFAULT, transport=DEFAULT_TRANSPORT,
5656
ssl_key_file=DEFAULT_SSL_KEY_FILE,
5757
ssl_cert_file=DEFAULT_SSL_CERT_FILE,
@@ -64,6 +64,8 @@ def connect(host="localhost", port=33013, user=None, password=None,
6464
6565
:param port: Refer to :paramref:`~tarantool.Connection.params.port`.
6666
67+
:param socket_fd: Refer to :paramref:`~tarantool.Connection.params.socket_fd`.
68+
6769
:param user: Refer to :paramref:`~tarantool.Connection.params.user`.
6870
6971
:param password: Refer to
@@ -93,6 +95,7 @@ def connect(host="localhost", port=33013, user=None, password=None,
9395
"""
9496

9597
return Connection(host, port,
98+
socket_fd=socket_fd,
9699
user=user,
97100
password=password,
98101
socket_timeout=SOCKET_TIMEOUT,

tarantool/connection.py

+97-7
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
# pylint: disable=too-many-lines,duplicate-code
55

66
import os
7+
import select
78
import time
89
import errno
910
from enum import Enum
@@ -51,6 +52,9 @@
5152
RECONNECT_DELAY,
5253
DEFAULT_TRANSPORT,
5354
SSL_TRANSPORT,
55+
DEFAULT_HOST,
56+
DEFAULT_PORT,
57+
DEFAULT_SOCKET_FD,
5458
DEFAULT_SSL_KEY_FILE,
5559
DEFAULT_SSL_CERT_FILE,
5660
DEFAULT_SSL_CA_FILE,
@@ -594,7 +598,10 @@ class Connection(ConnectionInterface):
594598
:value: :exc:`~tarantool.error.CrudModuleError`
595599
"""
596600

597-
def __init__(self, host, port,
601+
def __init__(self,
602+
host=DEFAULT_HOST,
603+
port=DEFAULT_PORT,
604+
socket_fd=DEFAULT_SOCKET_FD,
598605
user=None,
599606
password=None,
600607
socket_timeout=SOCKET_TIMEOUT,
@@ -623,8 +630,11 @@ def __init__(self, host, port,
623630
Unix sockets.
624631
:type host: :obj:`str` or :obj:`None`
625632
626-
:param port: Server port or Unix socket path.
627-
:type port: :obj:`int` or :obj:`str`
633+
:param port: Server port, or Unix socket path.
634+
:type port: :obj:`int` or :obj:`str` or :obj:`None`
635+
636+
:param socket_fd: socket fd number.
637+
:type socket_fd: :obj:`int` or :obj:`None`
628638
629639
:param user: User name for authentication on the Tarantool
630640
server.
@@ -804,6 +814,18 @@ def __init__(self, host, port,
804814
"""
805815
# pylint: disable=too-many-arguments,too-many-locals,too-many-statements
806816

817+
if host is None and port is None and socket_fd is None:
818+
raise ConfigurationError("need to specify host/port, "
819+
"port (in case of Unix sockets) "
820+
"or socket_fd")
821+
822+
if socket_fd is not None and (host is not None or port is not None):
823+
raise ConfigurationError("specifying both socket_fd and host/port is not allowed")
824+
825+
if host is not None and port is None:
826+
raise ConfigurationError("when specifying host, "
827+
"it is also necessary to specify port")
828+
807829
if msgpack.version >= (1, 0, 0) and encoding not in (None, 'utf-8'):
808830
raise ConfigurationError("msgpack>=1.0.0 only supports None and "
809831
+ "'utf-8' encoding option values")
@@ -820,6 +842,7 @@ def __init__(self, host, port,
820842
recv.restype = ctypes.c_int
821843
self.host = host
822844
self.port = port
845+
self.socket_fd = socket_fd
823846
self.user = user
824847
self.password = password
825848
self.socket_timeout = socket_timeout
@@ -897,10 +920,37 @@ def connect_basic(self):
897920
:meta private:
898921
"""
899922

900-
if self.host is None:
901-
self.connect_unix()
902-
else:
923+
if self.socket_fd is not None:
924+
self.connect_socket_fd()
925+
elif self.host is not None:
903926
self.connect_tcp()
927+
else:
928+
self.connect_unix()
929+
930+
def connect_socket_fd(self):
931+
"""
932+
Establish a connection using an existing socket fd.
933+
934+
+---------------------+--------------------------+-------------------------+
935+
| socket_fd / timeout | >= 0 | `None` |
936+
+=====================+==========================+=========================+
937+
| blocking | Set non-blocking socket | Don't change, `select` |
938+
| | lib call `select` | isn't needed |
939+
+---------------------+--------------------------+-------------------------+
940+
| non-blocking | Don't change, socket lib | Don't change, call |
941+
| | call `select` | `select` ourselves |
942+
+---------------------+--------------------------+-------------------------+
943+
944+
:meta private:
945+
"""
946+
947+
self.connected = True
948+
if self._socket:
949+
self._socket.close()
950+
951+
self._socket = socket.socket(fileno=self.socket_fd)
952+
if self.socket_timeout is not None:
953+
self._socket.settimeout(self.socket_timeout)
904954

905955
def connect_tcp(self):
906956
"""
@@ -1124,6 +1174,11 @@ def _recv(self, to_read):
11241174
while to_read > 0:
11251175
try:
11261176
tmp = self._socket.recv(to_read)
1177+
except BlockingIOError:
1178+
ready, _, _ = select.select([self._socket.fileno()], [], [], self.socket_timeout)
1179+
if not ready:
1180+
raise NetworkError(TimeoutError()) # pylint: disable=raise-missing-from
1181+
continue
11271182
except OverflowError as exc:
11281183
self._socket.close()
11291184
err = socket.error(
@@ -1163,6 +1218,41 @@ def _read_response(self):
11631218
# Read the packet
11641219
return self._recv(length)
11651220

1221+
def _sendall(self, bytes_to_send):
1222+
"""
1223+
Sends bytes to the transport (socket).
1224+
1225+
:param bytes_to_send: Message to send.
1226+
:type bytes_to_send: :obj:`bytes`
1227+
1228+
:raise: :exc:`~tarantool.error.NetworkError`
1229+
1230+
:meta private:
1231+
"""
1232+
1233+
total_sent = 0
1234+
while total_sent < len(bytes_to_send):
1235+
try:
1236+
sent = self._socket.send(bytes_to_send[total_sent:])
1237+
if sent == 0:
1238+
err = socket.error(
1239+
errno.ECONNRESET,
1240+
"Lost connection to server during query"
1241+
)
1242+
raise NetworkError(err)
1243+
total_sent += sent
1244+
except BlockingIOError as exc:
1245+
total_sent += exc.characters_written
1246+
_, ready, _ = select.select([], [self._socket.fileno()], [], self.socket_timeout)
1247+
if not ready:
1248+
raise NetworkError(TimeoutError()) # pylint: disable=raise-missing-from
1249+
except socket.error as exc:
1250+
err = socket.error(
1251+
errno.ECONNRESET,
1252+
"Lost connection to server during query"
1253+
)
1254+
raise NetworkError(err) from exc
1255+
11661256
def _send_request_wo_reconnect(self, request, on_push=None, on_push_ctx=None):
11671257
"""
11681258
Send request without trying to reconnect.
@@ -1191,7 +1281,7 @@ def _send_request_wo_reconnect(self, request, on_push=None, on_push_ctx=None):
11911281
response = None
11921282
while True:
11931283
try:
1194-
self._socket.sendall(bytes(request))
1284+
self._sendall(bytes(request))
11951285
response = request.response_class(self, self._read_response())
11961286
break
11971287
except SchemaReloadException as exc:

tarantool/connection_pool.py

+20-9
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ class PoolUnit():
115115

116116
addr: dict
117117
"""
118-
``{"host": host, "port": port}`` info.
118+
``{"host": host, "port": port, "socket_fd": socket_fd}`` info.
119119
120120
:type: :obj:`dict`
121121
"""
@@ -161,6 +161,14 @@ class PoolUnit():
161161
:type: :obj:`bool`
162162
"""
163163

164+
def get_address(self):
165+
"""
166+
Get an address string representation.
167+
"""
168+
if self.addr['socket_fd'] is not None:
169+
return f'fd://{self.addr["socket_fd"]}'
170+
return f'{self.addr["host"]}:{self.addr["port"]}'
171+
164172

165173
# Based on https://realpython.com/python-interface/
166174
class StrategyInterface(metaclass=abc.ABCMeta):
@@ -398,6 +406,7 @@ def __init__(self,
398406
{
399407
"host': "str" or None, # mandatory
400408
"port": int or "str", # mandatory
409+
"socket_fd": int, # optional
401410
"transport": "str", # optional
402411
"ssl_key_file": "str", # optional
403412
"ssl_cert_file": "str", # optional
@@ -499,6 +508,7 @@ def __init__(self,
499508
conn=Connection(
500509
host=addr['host'],
501510
port=addr['port'],
511+
socket_fd=addr['socket_fd'],
502512
user=user,
503513
password=password,
504514
socket_timeout=socket_timeout,
@@ -529,15 +539,16 @@ def _make_key(self, addr):
529539
"""
530540
Make a unique key for a server based on its address.
531541
532-
:param addr: `{"host": host, "port": port}` dictionary.
542+
:param addr: `{"host": host, "port": port, "socket_fd": socket_fd}` dictionary.
533543
:type addr: :obj:`dict`
534544
535545
:rtype: :obj:`str`
536546
537547
:meta private:
538548
"""
539-
540-
return f"{addr['host']}:{addr['port']}"
549+
if addr['socket_fd'] is None:
550+
return f"{addr['host']}:{addr['port']}"
551+
return addr['socket_fd']
541552

542553
def _get_new_state(self, unit):
543554
"""
@@ -557,23 +568,23 @@ def _get_new_state(self, unit):
557568
try:
558569
conn.connect()
559570
except NetworkError as exc:
560-
msg = (f"Failed to connect to {unit.addr['host']}:{unit.addr['port']}, "
571+
msg = (f"Failed to connect to {unit.get_address()}, "
561572
f"reason: {repr(exc)}")
562573
warn(msg, ClusterConnectWarning)
563574
return InstanceState(Status.UNHEALTHY)
564575

565576
try:
566577
resp = conn.call('box.info')
567578
except NetworkError as exc:
568-
msg = (f"Failed to get box.info for {unit.addr['host']}:{unit.addr['port']}, "
579+
msg = (f"Failed to get box.info for {unit.get_address()}, "
569580
f"reason: {repr(exc)}")
570581
warn(msg, PoolTolopogyWarning)
571582
return InstanceState(Status.UNHEALTHY)
572583

573584
try:
574585
read_only = resp.data[0]['ro']
575586
except (IndexError, KeyError) as exc:
576-
msg = (f"Incorrect box.info response from {unit.addr['host']}:{unit.addr['port']}"
587+
msg = (f"Incorrect box.info response from {unit.get_address()}"
577588
f"reason: {repr(exc)}")
578589
warn(msg, PoolTolopogyWarning)
579590
return InstanceState(Status.UNHEALTHY)
@@ -582,11 +593,11 @@ def _get_new_state(self, unit):
582593
status = resp.data[0]['status']
583594

584595
if status != 'running':
585-
msg = f"{unit.addr['host']}:{unit.addr['port']} instance status is not 'running'"
596+
msg = f"{unit.get_address()} instance status is not 'running'"
586597
warn(msg, PoolTolopogyWarning)
587598
return InstanceState(Status.UNHEALTHY)
588599
except (IndexError, KeyError) as exc:
589-
msg = (f"Incorrect box.info response from {unit.addr['host']}:{unit.addr['port']}"
600+
msg = (f"Incorrect box.info response from {unit.get_address()}"
590601
f"reason: {repr(exc)}")
591602
warn(msg, PoolTolopogyWarning)
592603
return InstanceState(Status.UNHEALTHY)

tarantool/const.py

+6
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,12 @@
103103
IPROTO_FEATURE_SPACE_AND_INDEX_NAMES = 5
104104
IPROTO_FEATURE_WATCH_ONCE = 6
105105

106+
# Default value for host.
107+
DEFAULT_HOST = None
108+
# Default value for port.
109+
DEFAULT_PORT = None
110+
# Default value for socket_fd.
111+
DEFAULT_SOCKET_FD = None
106112
# Default value for connection timeout (seconds)
107113
CONNECTION_TIMEOUT = None
108114
# Default value for socket timeout (seconds)

0 commit comments

Comments
 (0)