Skip to content

WIP #493

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft

WIP #493

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions benchmarks/callback_full_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ def insert_next(self, previous_result=sentinel):
def run(self):
self.start_profile()

if self.protocol_version >= 3:
concurrency = 1000
else:
concurrency = 100
concurrency = 1000

for _ in range(min(concurrency, self.num_queries)):
self.insert_next()
Expand Down
12 changes: 1 addition & 11 deletions cassandra/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,6 @@ class ProtocolVersion(object):
"""
Defines native protocol versions supported by this driver.
"""
V1 = 1
"""
v1, supported in Cassandra 1.2-->2.2
"""

V2 = 2
"""
v2, supported in Cassandra 2.0-->2.2;
added support for lightweight transactions, batch operations, and automatic query paging.
"""

V3 = 3
"""
Expand Down Expand Up @@ -180,7 +170,7 @@ class ProtocolVersion(object):
DSE private protocol v2, supported in DSE 6.0+
"""

SUPPORTED_VERSIONS = (DSE_V2, DSE_V1, V6, V5, V4, V3, V2, V1)
SUPPORTED_VERSIONS = (DSE_V2, DSE_V1, V6, V5, V4, V3)
Copy link
Preview

Copilot AI Jun 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SUPPORTED_VERSIONS tuple no longer includes V2 and V1, but get_supported_protocol_versions can still return versions 1 and 2. This mismatch may cause negotiation of unsupported protocols; please align both lists or enforce protocol >=3.

Copilot uses AI. Check for mistakes.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, makes sense.

"""
A tuple of all supported protocol versions
"""
Expand Down
106 changes: 0 additions & 106 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1452,18 +1452,6 @@

self._user_types = defaultdict(dict)

self._min_requests_per_connection = {
HostDistance.LOCAL_RACK: DEFAULT_MIN_REQUESTS,
HostDistance.LOCAL: DEFAULT_MIN_REQUESTS,
HostDistance.REMOTE: DEFAULT_MIN_REQUESTS
}

self._max_requests_per_connection = {
HostDistance.LOCAL_RACK: DEFAULT_MAX_REQUESTS,
HostDistance.LOCAL: DEFAULT_MAX_REQUESTS,
HostDistance.REMOTE: DEFAULT_MAX_REQUESTS
}

self._core_connections_per_host = {
HostDistance.LOCAL_RACK: DEFAULT_MIN_CONNECTIONS_PER_LOCAL_HOST,
HostDistance.LOCAL: DEFAULT_MIN_CONNECTIONS_PER_LOCAL_HOST,
Expand Down Expand Up @@ -1666,48 +1654,6 @@
if not_done:
raise OperationTimedOut("Failed to create all new connection pools in the %ss timeout.")

def get_min_requests_per_connection(self, host_distance):
return self._min_requests_per_connection[host_distance]

def set_min_requests_per_connection(self, host_distance, min_requests):
"""
Sets a threshold for concurrent requests per connection, below which
connections will be considered for disposal (down to core connections;
see :meth:`~Cluster.set_core_connections_per_host`).

Pertains to connection pool management in protocol versions {1,2}.
"""
if self.protocol_version >= 3:
raise UnsupportedOperation(
"Cluster.set_min_requests_per_connection() only has an effect "
"when using protocol_version 1 or 2.")
if min_requests < 0 or min_requests > 126 or \
min_requests >= self._max_requests_per_connection[host_distance]:
raise ValueError("min_requests must be 0-126 and less than the max_requests for this host_distance (%d)" %
(self._min_requests_per_connection[host_distance],))
self._min_requests_per_connection[host_distance] = min_requests

def get_max_requests_per_connection(self, host_distance):
return self._max_requests_per_connection[host_distance]

def set_max_requests_per_connection(self, host_distance, max_requests):
"""
Sets a threshold for concurrent requests per connection, above which new
connections will be created to a host (up to max connections;
see :meth:`~Cluster.set_max_connections_per_host`).

Pertains to connection pool management in protocol versions {1,2}.
"""
if self.protocol_version >= 3:
raise UnsupportedOperation(
"Cluster.set_max_requests_per_connection() only has an effect "
"when using protocol_version 1 or 2.")
if max_requests < 1 or max_requests > 127 or \
max_requests <= self._min_requests_per_connection[host_distance]:
raise ValueError("max_requests must be 1-127 and greater than the min_requests for this host_distance (%d)" %
(self._min_requests_per_connection[host_distance],))
self._max_requests_per_connection[host_distance] = max_requests

def get_core_connections_per_host(self, host_distance):
"""
Gets the minimum number of connections per Session that will be opened
Expand All @@ -1720,31 +1666,6 @@
"""
return self._core_connections_per_host[host_distance]

def set_core_connections_per_host(self, host_distance, core_connections):
"""
Sets the minimum number of connections per Session that will be opened
for each host with :class:`~.HostDistance` equal to `host_distance`.
The default is 2 for :attr:`~HostDistance.LOCAL` and 1 for
:attr:`~HostDistance.REMOTE`.

Protocol version 1 and 2 are limited in the number of concurrent
requests they can send per connection. The driver implements connection
pooling to support higher levels of concurrency.

If :attr:`~.Cluster.protocol_version` is set to 3 or higher, this
is not supported (there is always one connection per host, unless
the host is remote and :attr:`connect_to_remote_hosts` is :const:`False`)
and using this will result in an :exc:`~.UnsupportedOperation`.
"""
if self.protocol_version >= 3:
raise UnsupportedOperation(
"Cluster.set_core_connections_per_host() only has an effect "
"when using protocol_version 1 or 2.")
old = self._core_connections_per_host[host_distance]
self._core_connections_per_host[host_distance] = core_connections
if old < core_connections:
self._ensure_core_connections()

def get_max_connections_per_host(self, host_distance):
"""
Gets the maximum number of connections per Session that will be opened
Expand All @@ -1757,24 +1678,6 @@
"""
return self._max_connections_per_host[host_distance]

def set_max_connections_per_host(self, host_distance, max_connections):
"""
Sets the maximum number of connections per Session that will be opened
for each host with :class:`~.HostDistance` equal to `host_distance`.
The default is 2 for :attr:`~HostDistance.LOCAL` and 1 for
:attr:`~HostDistance.REMOTE`.

If :attr:`~.Cluster.protocol_version` is set to 3 or higher, this
is not supported (there is always one connection per host, unless
the host is remote and :attr:`connect_to_remote_hosts` is :const:`False`)
and using this will result in an :exc:`~.UnsupportedOperation`.
"""
if self.protocol_version >= 3:
raise UnsupportedOperation(
"Cluster.set_max_connections_per_host() only has an effect "
"when using protocol_version 1 or 2.")
self._max_connections_per_host[host_distance] = max_connections

def connection_factory(self, endpoint, host_conn = None, *args, **kwargs):
"""
Called to create a new connection with proper configuration.
Expand Down Expand Up @@ -2292,15 +2195,6 @@
with self._listener_lock:
return self._listeners.copy()

def _ensure_core_connections(self):
"""
If any host has fewer than the configured number of core connections
open, attempt to open connections until that number is met.
"""
for session in tuple(self.sessions):
for pool in tuple(session._pools.values()):
pool.ensure_core_connections()

@staticmethod
def _validate_refresh_schema(keyspace, table, usertype, function, aggregate):
if any((table, usertype, function, aggregate)):
Expand Down Expand Up @@ -4489,7 +4383,7 @@
self._scheduled_tasks.discard(task)
fn, args, kwargs = task
kwargs = dict(kwargs)
future = self._executor.submit(fn, *args, **kwargs)

Check failure on line 4386 in cassandra/cluster.py

View workflow job for this annotation

GitHub Actions / test asyncio (3.9)

cannot schedule new futures after shutdown
future.add_done_callback(self._log_if_failed)
else:
self._queue.put_nowait((run_at, i, task))
Expand Down
8 changes: 1 addition & 7 deletions cassandra/concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,7 @@ def execute_concurrent(session, statements_and_parameters, concurrency=100, rais
``parameters`` item must be a sequence or :const:`None`.

The `concurrency` parameter controls how many statements will be executed
concurrently. When :attr:`.Cluster.protocol_version` is set to 1 or 2,
it is recommended that this be kept below 100 times the number of
core connections per host times the number of connected hosts (see
:meth:`.Cluster.set_core_connections_per_host`). If that amount is exceeded,
the event loop thread may attempt to block on new connection creation,
substantially impacting throughput. If :attr:`~.Cluster.protocol_version`
is 3 or higher, you can safely experiment with higher levels of concurrency.
concurrently.

If `raise_on_first_error` is left as :const:`True`, execution will stop
after the first failed statement and the corresponding exception will be
Expand Down
23 changes: 8 additions & 15 deletions cassandra/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ def decompress(byts):
DEFAULT_LOCAL_PORT_LOW = 49152
DEFAULT_LOCAL_PORT_HIGH = 65535

frame_header_v1_v2 = struct.Struct('>BbBi')
frame_header_v3 = struct.Struct('>BhBi')


Expand Down Expand Up @@ -817,17 +816,12 @@ def __init__(self, host='127.0.0.1', port=9042, authenticator=None,
if not self.ssl_context and self.ssl_options:
self.ssl_context = self._build_ssl_context_from_options()

if protocol_version >= 3:
self.max_request_id = min(self.max_in_flight - 1, (2 ** 15) - 1)
# Don't fill the deque with 2**15 items right away. Start with some and add
# more if needed.
initial_size = min(300, self.max_in_flight)
self.request_ids = deque(range(initial_size))
self.highest_request_id = initial_size - 1
else:
self.max_request_id = min(self.max_in_flight, (2 ** 7) - 1)
self.request_ids = deque(range(self.max_request_id + 1))
self.highest_request_id = self.max_request_id
self.max_request_id = min(self.max_in_flight - 1, (2 ** 15) - 1)
# Don't fill the deque with 2**15 items right away. Start with some and add
# more if needed.
initial_size = min(300, self.max_in_flight)
self.request_ids = deque(range(initial_size))
self.highest_request_id = initial_size - 1

self.lock = RLock()
self.connected_event = Event()
Expand Down Expand Up @@ -1205,11 +1199,10 @@ def _read_frame_header(self):
version = buf[0] & PROTOCOL_VERSION_MASK
if version not in ProtocolVersion.SUPPORTED_VERSIONS:
raise ProtocolError("This version of the driver does not support protocol version %d" % version)
frame_header = frame_header_v3 if version >= 3 else frame_header_v1_v2
# this frame header struct is everything after the version byte
header_size = frame_header.size + 1
header_size = frame_header_v3.size + 1
if pos >= header_size:
flags, stream, op, body_len = frame_header.unpack_from(buf, 1)
flags, stream, op, body_len = frame_header_v3.unpack_from(buf, 1)
if body_len < 0:
raise ProtocolError("Received negative body length: %r" % body_len)
self._current_frame = _Frame(version, flags, stream, op, header_size, body_len + header_size)
Expand Down
42 changes: 15 additions & 27 deletions cassandra/cqltypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -812,18 +812,13 @@ class _SimpleParameterizedType(_ParameterizedType):
@classmethod
def deserialize_safe(cls, byts, protocol_version):
subtype, = cls.subtypes
if protocol_version >= 3:
unpack = int32_unpack
length = 4
else:
unpack = uint16_unpack
length = 2
numelements = unpack(byts[:length])
length = 4
numelements = int32_unpack(byts[:length])
p = length
result = []
inner_proto = max(3, protocol_version)
for _ in range(numelements):
itemlen = unpack(byts[p:p + length])
itemlen = int32_unpack(byts[p:p + length])
p += length
if itemlen < 0:
result.append(None)
Expand All @@ -839,16 +834,15 @@ def serialize_safe(cls, items, protocol_version):
raise TypeError("Received a string for a type that expects a sequence")

subtype, = cls.subtypes
pack = int32_pack if protocol_version >= 3 else uint16_pack
buf = io.BytesIO()
buf.write(pack(len(items)))
buf.write(int32_pack(len(items)))
inner_proto = max(3, protocol_version)
for item in items:
if item is None:
buf.write(pack(-1))
buf.write(int32_pack(-1))
else:
itembytes = subtype.to_binary(item, inner_proto)
buf.write(pack(len(itembytes)))
buf.write(int32_pack(len(itembytes)))
buf.write(itembytes)
return buf.getvalue()

Expand All @@ -872,18 +866,13 @@ class MapType(_ParameterizedType):
@classmethod
def deserialize_safe(cls, byts, protocol_version):
key_type, value_type = cls.subtypes
if protocol_version >= 3:
unpack = int32_unpack
length = 4
else:
unpack = uint16_unpack
length = 2
numelements = unpack(byts[:length])
length = 4
numelements = int32_unpack(byts[:length])
p = length
themap = util.OrderedMapSerializedKey(key_type, protocol_version)
inner_proto = max(3, protocol_version)
for _ in range(numelements):
key_len = unpack(byts[p:p + length])
key_len = int32_unpack(byts[p:p + length])
p += length
if key_len < 0:
keybytes = None
Expand All @@ -893,7 +882,7 @@ def deserialize_safe(cls, byts, protocol_version):
p += key_len
key = key_type.from_binary(keybytes, inner_proto)

val_len = unpack(byts[p:p + length])
val_len = int32_unpack(byts[p:p + length])
p += length
if val_len < 0:
val = None
Expand All @@ -908,9 +897,8 @@ def deserialize_safe(cls, byts, protocol_version):
@classmethod
def serialize_safe(cls, themap, protocol_version):
key_type, value_type = cls.subtypes
pack = int32_pack if protocol_version >= 3 else uint16_pack
buf = io.BytesIO()
buf.write(pack(len(themap)))
buf.write(int32_pack(len(themap)))
try:
items = themap.items()
except AttributeError:
Expand All @@ -919,16 +907,16 @@ def serialize_safe(cls, themap, protocol_version):
for key, val in items:
if key is not None:
keybytes = key_type.to_binary(key, inner_proto)
buf.write(pack(len(keybytes)))
buf.write(int32_pack(len(keybytes)))
buf.write(keybytes)
else:
buf.write(pack(-1))
buf.write(int32_pack(-1))
if val is not None:
valbytes = value_type.to_binary(val, inner_proto)
buf.write(pack(len(valbytes)))
buf.write(int32_pack(len(valbytes)))
buf.write(valbytes)
else:
buf.write(pack(-1))
buf.write(int32_pack(-1))
return buf.getvalue()


Expand Down
7 changes: 1 addition & 6 deletions cassandra/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,7 @@ def refresh(self, connection, timeout, target_type=None, change_type=None, fetch
meta = parse_method(self.keyspaces, **kwargs)
if meta:
update_method = getattr(self, '_update_' + tt_lower)
if tt_lower == 'keyspace' and connection.protocol_version < 3:
# we didn't have 'type' target in legacy protocol versions, so we need to query those too
user_types = parser.get_types_map(self.keyspaces, **kwargs)
self._update_keyspace(meta, user_types)
else:
update_method(meta)
update_method(meta)
else:
drop_method = getattr(self, '_drop_' + tt_lower)
drop_method(**kwargs)
Expand Down
Loading
Loading