Skip to content

Commit 203c576

Browse files
committed
WIP
1 parent 3f7bcbb commit 203c576

17 files changed

+84
-295
lines changed

benchmarks/callback_full_pipeline.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,7 @@ def insert_next(self, previous_result=sentinel):
4949
def run(self):
5050
self.start_profile()
5151

52-
if self.protocol_version >= 3:
53-
concurrency = 1000
54-
else:
55-
concurrency = 100
52+
concurrency = 1000
5653

5754
for _ in range(min(concurrency, self.num_queries)):
5855
self.insert_next()

cassandra/__init__.py

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -135,16 +135,6 @@ class ProtocolVersion(object):
135135
"""
136136
Defines native protocol versions supported by this driver.
137137
"""
138-
V1 = 1
139-
"""
140-
v1, supported in Cassandra 1.2-->2.2
141-
"""
142-
143-
V2 = 2
144-
"""
145-
v2, supported in Cassandra 2.0-->2.2;
146-
added support for lightweight transactions, batch operations, and automatic query paging.
147-
"""
148138

149139
V3 = 3
150140
"""
@@ -180,7 +170,7 @@ class ProtocolVersion(object):
180170
DSE private protocol v2, supported in DSE 6.0+
181171
"""
182172

183-
SUPPORTED_VERSIONS = (DSE_V2, DSE_V1, V6, V5, V4, V3, V2, V1)
173+
SUPPORTED_VERSIONS = (DSE_V2, DSE_V1, V6, V5, V4, V3)
184174
"""
185175
A tuple of all supported protocol versions
186176
"""

cassandra/cluster.py

Lines changed: 0 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1452,18 +1452,6 @@ def __init__(self,
14521452

14531453
self._user_types = defaultdict(dict)
14541454

1455-
self._min_requests_per_connection = {
1456-
HostDistance.LOCAL_RACK: DEFAULT_MIN_REQUESTS,
1457-
HostDistance.LOCAL: DEFAULT_MIN_REQUESTS,
1458-
HostDistance.REMOTE: DEFAULT_MIN_REQUESTS
1459-
}
1460-
1461-
self._max_requests_per_connection = {
1462-
HostDistance.LOCAL_RACK: DEFAULT_MAX_REQUESTS,
1463-
HostDistance.LOCAL: DEFAULT_MAX_REQUESTS,
1464-
HostDistance.REMOTE: DEFAULT_MAX_REQUESTS
1465-
}
1466-
14671455
self._core_connections_per_host = {
14681456
HostDistance.LOCAL_RACK: DEFAULT_MIN_CONNECTIONS_PER_LOCAL_HOST,
14691457
HostDistance.LOCAL: DEFAULT_MIN_CONNECTIONS_PER_LOCAL_HOST,
@@ -1666,48 +1654,6 @@ def add_execution_profile(self, name, profile, pool_wait_timeout=5):
16661654
if not_done:
16671655
raise OperationTimedOut("Failed to create all new connection pools in the %ss timeout.")
16681656

1669-
def get_min_requests_per_connection(self, host_distance):
1670-
return self._min_requests_per_connection[host_distance]
1671-
1672-
def set_min_requests_per_connection(self, host_distance, min_requests):
1673-
"""
1674-
Sets a threshold for concurrent requests per connection, below which
1675-
connections will be considered for disposal (down to core connections;
1676-
see :meth:`~Cluster.set_core_connections_per_host`).
1677-
1678-
Pertains to connection pool management in protocol versions {1,2}.
1679-
"""
1680-
if self.protocol_version >= 3:
1681-
raise UnsupportedOperation(
1682-
"Cluster.set_min_requests_per_connection() only has an effect "
1683-
"when using protocol_version 1 or 2.")
1684-
if min_requests < 0 or min_requests > 126 or \
1685-
min_requests >= self._max_requests_per_connection[host_distance]:
1686-
raise ValueError("min_requests must be 0-126 and less than the max_requests for this host_distance (%d)" %
1687-
(self._min_requests_per_connection[host_distance],))
1688-
self._min_requests_per_connection[host_distance] = min_requests
1689-
1690-
def get_max_requests_per_connection(self, host_distance):
1691-
return self._max_requests_per_connection[host_distance]
1692-
1693-
def set_max_requests_per_connection(self, host_distance, max_requests):
1694-
"""
1695-
Sets a threshold for concurrent requests per connection, above which new
1696-
connections will be created to a host (up to max connections;
1697-
see :meth:`~Cluster.set_max_connections_per_host`).
1698-
1699-
Pertains to connection pool management in protocol versions {1,2}.
1700-
"""
1701-
if self.protocol_version >= 3:
1702-
raise UnsupportedOperation(
1703-
"Cluster.set_max_requests_per_connection() only has an effect "
1704-
"when using protocol_version 1 or 2.")
1705-
if max_requests < 1 or max_requests > 127 or \
1706-
max_requests <= self._min_requests_per_connection[host_distance]:
1707-
raise ValueError("max_requests must be 1-127 and greater than the min_requests for this host_distance (%d)" %
1708-
(self._min_requests_per_connection[host_distance],))
1709-
self._max_requests_per_connection[host_distance] = max_requests
1710-
17111657
def get_core_connections_per_host(self, host_distance):
17121658
"""
17131659
Gets the minimum number of connections per Session that will be opened

cassandra/connection.py

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,6 @@ def decompress(byts):
123123
DEFAULT_LOCAL_PORT_LOW = 49152
124124
DEFAULT_LOCAL_PORT_HIGH = 65535
125125

126-
frame_header_v1_v2 = struct.Struct('>BbBi')
127126
frame_header_v3 = struct.Struct('>BhBi')
128127

129128

@@ -817,17 +816,12 @@ def __init__(self, host='127.0.0.1', port=9042, authenticator=None,
817816
if not self.ssl_context and self.ssl_options:
818817
self.ssl_context = self._build_ssl_context_from_options()
819818

820-
if protocol_version >= 3:
821-
self.max_request_id = min(self.max_in_flight - 1, (2 ** 15) - 1)
822-
# Don't fill the deque with 2**15 items right away. Start with some and add
823-
# more if needed.
824-
initial_size = min(300, self.max_in_flight)
825-
self.request_ids = deque(range(initial_size))
826-
self.highest_request_id = initial_size - 1
827-
else:
828-
self.max_request_id = min(self.max_in_flight, (2 ** 7) - 1)
829-
self.request_ids = deque(range(self.max_request_id + 1))
830-
self.highest_request_id = self.max_request_id
819+
self.max_request_id = min(self.max_in_flight - 1, (2 ** 15) - 1)
820+
# Don't fill the deque with 2**15 items right away. Start with some and add
821+
# more if needed.
822+
initial_size = min(300, self.max_in_flight)
823+
self.request_ids = deque(range(initial_size))
824+
self.highest_request_id = initial_size - 1
831825

832826
self.lock = RLock()
833827
self.connected_event = Event()
@@ -1205,11 +1199,10 @@ def _read_frame_header(self):
12051199
version = buf[0] & PROTOCOL_VERSION_MASK
12061200
if version not in ProtocolVersion.SUPPORTED_VERSIONS:
12071201
raise ProtocolError("This version of the driver does not support protocol version %d" % version)
1208-
frame_header = frame_header_v3 if version >= 3 else frame_header_v1_v2
12091202
# this frame header struct is everything after the version byte
1210-
header_size = frame_header.size + 1
1203+
header_size = frame_header_v3.size + 1
12111204
if pos >= header_size:
1212-
flags, stream, op, body_len = frame_header.unpack_from(buf, 1)
1205+
flags, stream, op, body_len = frame_header_v3.unpack_from(buf, 1)
12131206
if body_len < 0:
12141207
raise ProtocolError("Received negative body length: %r" % body_len)
12151208
self._current_frame = _Frame(version, flags, stream, op, header_size, body_len + header_size)

cassandra/cqltypes.py

Lines changed: 15 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -812,18 +812,13 @@ class _SimpleParameterizedType(_ParameterizedType):
812812
@classmethod
813813
def deserialize_safe(cls, byts, protocol_version):
814814
subtype, = cls.subtypes
815-
if protocol_version >= 3:
816-
unpack = int32_unpack
817-
length = 4
818-
else:
819-
unpack = uint16_unpack
820-
length = 2
821-
numelements = unpack(byts[:length])
815+
length = 4
816+
numelements = int32_unpack(byts[:length])
822817
p = length
823818
result = []
824819
inner_proto = max(3, protocol_version)
825820
for _ in range(numelements):
826-
itemlen = unpack(byts[p:p + length])
821+
itemlen = int32_unpack(byts[p:p + length])
827822
p += length
828823
if itemlen < 0:
829824
result.append(None)
@@ -839,16 +834,15 @@ def serialize_safe(cls, items, protocol_version):
839834
raise TypeError("Received a string for a type that expects a sequence")
840835

841836
subtype, = cls.subtypes
842-
pack = int32_pack if protocol_version >= 3 else uint16_pack
843837
buf = io.BytesIO()
844-
buf.write(pack(len(items)))
838+
buf.write(int32_pack(len(items)))
845839
inner_proto = max(3, protocol_version)
846840
for item in items:
847841
if item is None:
848-
buf.write(pack(-1))
842+
buf.write(int32_pack(-1))
849843
else:
850844
itembytes = subtype.to_binary(item, inner_proto)
851-
buf.write(pack(len(itembytes)))
845+
buf.write(int32_pack(len(itembytes)))
852846
buf.write(itembytes)
853847
return buf.getvalue()
854848

@@ -872,18 +866,13 @@ class MapType(_ParameterizedType):
872866
@classmethod
873867
def deserialize_safe(cls, byts, protocol_version):
874868
key_type, value_type = cls.subtypes
875-
if protocol_version >= 3:
876-
unpack = int32_unpack
877-
length = 4
878-
else:
879-
unpack = uint16_unpack
880-
length = 2
881-
numelements = unpack(byts[:length])
869+
length = 4
870+
numelements = int32_unpack(byts[:length])
882871
p = length
883872
themap = util.OrderedMapSerializedKey(key_type, protocol_version)
884873
inner_proto = max(3, protocol_version)
885874
for _ in range(numelements):
886-
key_len = unpack(byts[p:p + length])
875+
key_len = int32_unpack(byts[p:p + length])
887876
p += length
888877
if key_len < 0:
889878
keybytes = None
@@ -893,7 +882,7 @@ def deserialize_safe(cls, byts, protocol_version):
893882
p += key_len
894883
key = key_type.from_binary(keybytes, inner_proto)
895884

896-
val_len = unpack(byts[p:p + length])
885+
val_len = int32_unpack(byts[p:p + length])
897886
p += length
898887
if val_len < 0:
899888
val = None
@@ -908,9 +897,8 @@ def deserialize_safe(cls, byts, protocol_version):
908897
@classmethod
909898
def serialize_safe(cls, themap, protocol_version):
910899
key_type, value_type = cls.subtypes
911-
pack = int32_pack if protocol_version >= 3 else uint16_pack
912900
buf = io.BytesIO()
913-
buf.write(pack(len(themap)))
901+
buf.write(int32_pack(len(themap)))
914902
try:
915903
items = themap.items()
916904
except AttributeError:
@@ -919,16 +907,16 @@ def serialize_safe(cls, themap, protocol_version):
919907
for key, val in items:
920908
if key is not None:
921909
keybytes = key_type.to_binary(key, inner_proto)
922-
buf.write(pack(len(keybytes)))
910+
buf.write(int32_pack(len(keybytes)))
923911
buf.write(keybytes)
924912
else:
925-
buf.write(pack(-1))
913+
buf.write(int32_pack(-1))
926914
if val is not None:
927915
valbytes = value_type.to_binary(val, inner_proto)
928-
buf.write(pack(len(valbytes)))
916+
buf.write(int32_pack(len(valbytes)))
929917
buf.write(valbytes)
930918
else:
931-
buf.write(pack(-1))
919+
buf.write(int32_pack(-1))
932920
return buf.getvalue()
933921

934922

cassandra/metadata.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -153,12 +153,7 @@ def refresh(self, connection, timeout, target_type=None, change_type=None, fetch
153153
meta = parse_method(self.keyspaces, **kwargs)
154154
if meta:
155155
update_method = getattr(self, '_update_' + tt_lower)
156-
if tt_lower == 'keyspace' and connection.protocol_version < 3:
157-
# we didn't have 'type' target in legacy protocol versions, so we need to query those too
158-
user_types = parser.get_types_map(self.keyspaces, **kwargs)
159-
self._update_keyspace(meta, user_types)
160-
else:
161-
update_method(meta)
156+
update_method(meta)
162157
else:
163158
drop_method = getattr(self, '_drop_' + tt_lower)
164159
drop_method(**kwargs)

cassandra/pool.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -990,7 +990,6 @@ def borrow_connection(self, timeout, routing_key=None):
990990
# note: it would be nice to push changes to these config settings
991991
# to pools instead of doing a new lookup on every
992992
# borrow_connection() call
993-
max_reqs = self._session.cluster.get_max_requests_per_connection(self.host_distance)
994993
max_conns = self._session.cluster.get_max_connections_per_host(self.host_distance)
995994

996995
least_busy = min(conns, key=lambda c: c.in_flight)
@@ -1012,10 +1011,9 @@ def borrow_connection(self, timeout, routing_key=None):
10121011
# wait_for_conn will increment in_flight on the conn
10131012
least_busy, request_id = self._wait_for_conn(timeout)
10141013

1015-
# if we have too many requests on this connection but we still
1016-
# have space to open a new connection against this host, go ahead
1014+
# if we still have space to open a new connection against this host, go ahead
10171015
# and schedule the creation of a new connection
1018-
if least_busy.in_flight >= max_reqs and len(self._connections) < max_conns:
1016+
if len(self._connections) < max_conns:
10191017
self._maybe_spawn_new_connection()
10201018

10211019
return least_busy, request_id
@@ -1144,11 +1142,10 @@ def return_connection(self, connection, stream_was_orphaned=False):
11441142
return
11451143

11461144
core_conns = self._session.cluster.get_core_connections_per_host(self.host_distance)
1147-
min_reqs = self._session.cluster.get_min_requests_per_connection(self.host_distance)
11481145
# we can use in_flight here without holding the connection lock
11491146
# because the fact that in_flight dipped below the min at some
11501147
# point is enough to start the trashing procedure
1151-
if len(self._connections) > core_conns and in_flight <= min_reqs and \
1148+
if len(self._connections) > core_conns and \
11521149
time.time() >= self._next_trash_allowed_at:
11531150
self._maybe_trash_connection(connection)
11541151
else:

0 commit comments

Comments
 (0)