Skip to content

After a node restarts, driver reconnects multiple times, causing queries to fail #295

Open
@kbr-scylla

Description

@kbr-scylla

In test.py tests, after restarting a node, we're often using the utility function wait_for_cql (or wait_for_cql_and_get_hosts) on a pre-existing driver session (which is created at the beginning of the test and connected to the cluster), to wait for that session to be able to query the restarted node.

This is the logic:

async def wait_for(
        pred: Callable[[], Awaitable[Optional[T]]],
        deadline: float,
        period: float = 1,
        before_retry: Optional[Callable[[], Any]] = None) -> T:
    while True:
        assert(time.time() < deadline), "Deadline exceeded, failing test."
        res = await pred()
        if res is not None:
            return res
        await asyncio.sleep(period)
        if before_retry:
            before_retry()


async def wait_for_cql(cql: Session, host: Host, deadline: float) -> None:
    async def cql_ready():
        try:
            await cql.run_async("select * from system.local", host=host)
        except NoHostAvailable:
            logging.info(f"Driver not connected to {host} yet")
            return None
        return True
    await wait_for(cql_ready, deadline)

async def wait_for_cql_and_get_hosts(cql: Session, servers: list[ServerInfo], deadline: float) \
        -> list[Host]:
    """Wait until every server in `servers` is available through `cql`
       and translate `servers` to a list of Cassandra `Host`s.
    """
    ip_set = set(str(srv.rpc_address) for srv in servers)
    async def get_hosts() -> Optional[list[Host]]:
        hosts = cql.cluster.metadata.all_hosts()
        remaining = ip_set - {h.address for h in hosts}
        if not remaining:
            return hosts

        logging.info(f"Driver hasn't yet learned about hosts: {remaining}")
        return None
    hosts = await wait_for(
        pred=get_hosts,
        deadline=deadline,
        before_retry=lambda: cql.cluster.refresh_nodes(force_token_rebuild=True),
    )

    # Take only hosts from `ip_set` (there may be more)
    hosts = [h for h in hosts if h.address in ip_set]
    await asyncio.gather(*(wait_for_cql(cql, h, deadline) for h in hosts))

    return hosts

In short, we're waiting until select * from system.local succeeds on that Host. wait_for_cql_and_get_hosts additionally translates an IP address in the form of a string into an object of the Host type (defined by the driver).

Then a test may attempt to perform a query -- immediately after calling wait_for_cql / wait_for_cql_and_get_hosts.

Unfortunately, that query may fail.

The reason is (judging from driver logs), that even after the driver reconnected and select * from system.local succeeded, the driver may for some reason decide to reconnect again, which involves dropping the existing connections and creating new ones. This might happen in the middle of the next query, causing the query to fail. In fact the driver may reconnect multiple times in this way.

Here's a fragment of relevant logs from one of test.py test runs:

10:04:15.630 INFO> Server Server(597, 127.167.175.35, 127.167.175.35) restarted
10:04:15.645 DEBUG> Connection 281470374861840: '('127.0.0.1', 34094)' -> '('127.167.175.35', 9042)'
10:04:15.645 DEBUG> Connection 281470374907600: '('127.0.0.1', 34106)' -> '('127.167.175.35', 9042)'
10:04:15.645 DEBUG> Sending initial options message for new connection (281470374861840) to 127.167.175.35:9042
10:04:15.645 DEBUG> Sending initial options message for new connection (281470374907600) to 127.167.175.35:9042
10:04:15.649 DEBUG> Parsing sharding info from message options {'COMPRESSION': ['lz4', 'snappy'], 'SCYLLA_LWT_ADD_METADATA_MARK': ['LWT_OPTIMIZATION_META_BIT_MASK=2147483648'], 'SCYLLA_NR_SHARDS': ['2'], 'SCYLLA_PARTITIONER': ['org.apache.cassandra.dht.Murmur3Partitioner'], 'SCYLLA_RATE_LIMIT_ERROR': ['ERROR_CODE=61440'], 'SCYLLA_SHARD': ['0'], 'SCYLLA_SHARDING_ALGORITHM': ['biased-token-round-robin'], 'SCYLLA_SHARDING_IGNORE_MSB': ['12'], 'TABLETS_ROUTING_V1': ['']}
10:04:15.650 DEBUG> Received options response on new connection (281470374861840) from 127.167.175.35:9042
10:04:15.650 DEBUG> No available compression types supported on both ends. locally supported: odict_keys([]). remotely supported: ['lz4', 'snappy']
10:04:15.650 DEBUG> Sending StartupMessage on <CustomConnection(281470374861840) 127.167.175.35:9042>
10:04:15.650 DEBUG> Sent StartupMessage on <CustomConnection(281470374861840) 127.167.175.35:9042>
10:04:15.650 DEBUG> Parsing sharding info from message options {'COMPRESSION': ['lz4', 'snappy'], 'SCYLLA_LWT_ADD_METADATA_MARK': ['LWT_OPTIMIZATION_META_BIT_MASK=2147483648'], 'SCYLLA_NR_SHARDS': ['2'], 'SCYLLA_PARTITIONER': ['org.apache.cassandra.dht.Murmur3Partitioner'], 'SCYLLA_RATE_LIMIT_ERROR': ['ERROR_CODE=61440'], 'SCYLLA_SHARD': ['0'], 'SCYLLA_SHARDING_ALGORITHM': ['biased-token-round-robin'], 'SCYLLA_SHARDING_IGNORE_MSB': ['12'], 'TABLETS_ROUTING_V1': ['']}
10:04:15.650 DEBUG> Received options response on new connection (281470374907600) from 127.167.175.35:9042
10:04:15.651 DEBUG> No available compression types supported on both ends. locally supported: odict_keys([]). remotely supported: ['lz4', 'snappy']
10:04:15.651 DEBUG> Sending StartupMessage on <CustomConnection(281470374907600) 127.167.175.35:9042>
10:04:15.651 DEBUG> Sent StartupMessage on <CustomConnection(281470374907600) 127.167.175.35:9042>
10:04:15.658 DEBUG> Got ReadyMessage on new connection (281470374907600) from 127.167.175.35:9042
10:04:15.658 INFO> Successful reconnection to 127.167.175.35:9042, marking node up if it isn't already
10:04:15.658 DEBUG> Waiting to acquire lock for handling up status of node 127.167.175.35:9042
10:04:15.659 DEBUG> Starting to handle up status of node 127.167.175.35:9042
10:04:15.659 DEBUG> Got ReadyMessage on new connection (281470374861840) from 127.167.175.35:9042
10:04:15.659 INFO> Host 127.167.175.35:9042 may be up; will prepare queries and open connection pool
10:04:15.659 INFO> Successful reconnection to 127.167.175.35:9042, marking node up if it isn't already
10:04:15.659 DEBUG> Now that host 127.167.175.35:9042 is up, cancelling the reconnection handler
10:04:15.659 DEBUG> Waiting to acquire lock for handling up status of node 127.167.175.35:9042
10:04:15.660 DEBUG> Done preparing all queries for host 127.167.175.35:9042, 
10:04:15.660 DEBUG> Starting to handle up status of node 127.167.175.35:9042
10:04:15.660 DEBUG> Removed connection pool for <Host: 127.167.175.35:9042>
10:04:15.660 INFO> Host 127.167.175.35:9042 may be up; will prepare queries and open connection pool
10:04:15.660 DEBUG> Signalling to load balancing policies that host 127.167.175.35:9042 is up
10:04:15.660 DEBUG> Now that host 127.167.175.35:9042 is up, cancelling the reconnection handler
10:04:15.661 DEBUG> Signalling to control connection that host 127.167.175.35:9042 is up
10:04:15.661 DEBUG> Done preparing all queries for host 127.167.175.35:9042, 
10:04:15.661 DEBUG> Attempting to open new connection pools for host 127.167.175.35:9042
10:04:15.661 DEBUG> Signalling to load balancing policies that host 127.167.175.35:9042 is up
10:04:15.661 DEBUG> Closing connection (281470374907600) to 127.167.175.35:9042
10:04:15.663 DEBUG> Signalling to control connection that host 127.167.175.35:9042 is up
10:04:15.664 DEBUG> Attempting to open new connection pools for host 127.167.175.35:9042
10:04:15.664 DEBUG> Closing connection (281470374861840) to 127.167.175.35:9042
10:04:15.665 DEBUG> Closed socket to 127.167.175.35:9042
10:04:15.665 DEBUG> Closed socket to 127.167.175.35:9042
10:04:15.666 DEBUG> Shutting down connections to 127.167.175.35:9042
10:04:15.666 DEBUG> Initializing connection for host 127.167.175.35:9042
10:04:15.666 DEBUG> Closing connection (281470374626576) to 127.167.175.35:9042
10:04:15.667 DEBUG> Closing connection (281470374626576) to 127.167.175.35:9042
10:04:15.667 DEBUG> Closed socket to 127.167.175.35:9042
10:04:15.667 DEBUG> Closing excess connection (281470374625424) to 127.167.175.35:9042
10:04:15.668 DEBUG> Connection 281470371258256: '('127.0.0.1', 34114)' -> '('127.167.175.35', 9042)'
10:04:15.668 DEBUG> Closing connection (281470374625424) to 127.167.175.35:9042
10:04:15.668 DEBUG> Sending initial options message for new connection (281470371258256) to 127.167.175.35:9042
10:04:15.668 DEBUG> Closed socket to 127.167.175.35:9042
10:04:15.669 DEBUG> Initializing connection for host 127.167.175.35:9042
10:04:15.670 DEBUG> Connection 281470373696208: '('127.0.0.1', 34116)' -> '('127.167.175.35', 9042)'
10:04:15.670 DEBUG> Sending initial options message for new connection (281470373696208) to 127.167.175.35:9042
10:04:15.677 DEBUG> Parsing sharding info from message options {'COMPRESSION': ['lz4', 'snappy'], 'SCYLLA_LWT_ADD_METADATA_MARK': ['LWT_OPTIMIZATION_META_BIT_MASK=2147483648'], 'SCYLLA_NR_SHARDS': ['2'], 'SCYLLA_PARTITIONER': ['org.apache.cassandra.dht.Murmur3Partitioner'], 'SCYLLA_RATE_LIMIT_ERROR': ['ERROR_CODE=61440'], 'SCYLLA_SHARD': ['0'], 'SCYLLA_SHARDING_ALGORITHM': ['biased-token-round-robin'], 'SCYLLA_SHARDING_IGNORE_MSB': ['12'], 'TABLETS_ROUTING_V1': ['']}
10:04:15.677 DEBUG> Received options response on new connection (281470373696208) from 127.167.175.35:9042
10:04:15.677 DEBUG> No available compression types supported on both ends. locally supported: odict_keys([]). remotely supported: ['lz4', 'snappy']
10:04:15.677 DEBUG> Sending StartupMessage on <CustomConnection(281470373696208) 127.167.175.35:9042>
10:04:15.677 DEBUG> Sent StartupMessage on <CustomConnection(281470373696208) 127.167.175.35:9042>
10:04:15.678 DEBUG> Parsing sharding info from message options {'COMPRESSION': ['lz4', 'snappy'], 'SCYLLA_LWT_ADD_METADATA_MARK': ['LWT_OPTIMIZATION_META_BIT_MASK=2147483648'], 'SCYLLA_NR_SHARDS': ['2'], 'SCYLLA_PARTITIONER': ['org.apache.cassandra.dht.Murmur3Partitioner'], 'SCYLLA_RATE_LIMIT_ERROR': ['ERROR_CODE=61440'], 'SCYLLA_SHARD': ['0'], 'SCYLLA_SHARDING_ALGORITHM': ['biased-token-round-robin'], 'SCYLLA_SHARDING_IGNORE_MSB': ['12'], 'TABLETS_ROUTING_V1': ['']}
10:04:15.678 DEBUG> Received options response on new connection (281470371258256) from 127.167.175.35:9042
10:04:15.678 DEBUG> No available compression types supported on both ends. locally supported: odict_keys([]). remotely supported: ['lz4', 'snappy']
10:04:15.678 DEBUG> Sending StartupMessage on <CustomConnection(281470371258256) 127.167.175.35:9042>
10:04:15.678 DEBUG> Sent StartupMessage on <CustomConnection(281470371258256) 127.167.175.35:9042>
10:04:15.683 DEBUG> Got ReadyMessage on new connection (281470371258256) from 127.167.175.35:9042
10:04:15.684 DEBUG> First connection created to 127.167.175.35:9042 for shard_id=0
10:04:15.686 DEBUG> Finished initializing connection for host 127.167.175.35:9042
10:04:15.686 DEBUG> Got ReadyMessage on new connection (281470373696208) from 127.167.175.35:9042
10:04:15.687 DEBUG> Added pool for host 127.167.175.35:9042 to session
10:04:15.687 DEBUG> First connection created to 127.167.175.35:9042 for shard_id=0
10:04:15.687 INFO> Connection pools established for node 127.167.175.35:9042
10:04:15.687 DEBUG> Finished initializing connection for host 127.167.175.35:9042
10:04:15.688 DEBUG> Host 127.167.175.35:9042 is now marked up
10:04:15.688 DEBUG> Added pool for host 127.167.175.35:9042 to session
10:04:15.688 DEBUG> shard_aware_endpoint=None
10:04:15.688 DEBUG> Shutting down connections to 127.167.175.35:9042
10:04:15.689 DEBUG> Closing connection (281470371258256) to 127.167.175.35:9042
10:04:15.689 DEBUG> Closing connection (281470371258256) to 127.167.175.35:9042
10:04:15.689 DEBUG> Closed socket to 127.167.175.35:9042
10:04:15.689 INFO> Connection pools established for node 127.167.175.35:9042
10:04:15.689 DEBUG> Host 127.167.175.35:9042 is now marked up
10:04:15.689 DEBUG> shard_aware_endpoint=None
10:04:15.690 DEBUG> Connection 281470371266320: '('127.0.0.1', 34118)' -> '('127.167.175.35', 9042)'
10:04:15.690 DEBUG> Sending initial options message for new connection (281470371266320) to 127.167.175.35:9042
10:04:15.690 DEBUG> Connection 281470371261456: '('127.0.0.1', 34134)' -> '('127.167.175.35', 9042)'
10:04:15.691 DEBUG> Sending initial options message for new connection (281470371261456) to 127.167.175.35:9042
10:04:15.706 DEBUG> Parsing sharding info from message options {'COMPRESSION': ['lz4', 'snappy'], 'SCYLLA_LWT_ADD_METADATA_MARK': ['LWT_OPTIMIZATION_META_BIT_MASK=2147483648'], 'SCYLLA_NR_SHARDS': ['2'], 'SCYLLA_PARTITIONER': ['org.apache.cassandra.dht.Murmur3Partitioner'], 'SCYLLA_RATE_LIMIT_ERROR': ['ERROR_CODE=61440'], 'SCYLLA_SHARD': ['0'], 'SCYLLA_SHARDING_ALGORITHM': ['biased-token-round-robin'], 'SCYLLA_SHARDING_IGNORE_MSB': ['12'], 'TABLETS_ROUTING_V1': ['']}
10:04:15.707 DEBUG> Received options response on new connection (281470371261456) from 127.167.175.35:9042
10:04:15.707 DEBUG> No available compression types supported on both ends. locally supported: odict_keys([]). remotely supported: ['lz4', 'snappy']
10:04:15.707 DEBUG> Sending StartupMessage on <CustomConnection(281470371261456) 127.167.175.35:9042>
10:04:15.707 DEBUG> Sent StartupMessage on <CustomConnection(281470371261456) 127.167.175.35:9042>
10:04:15.707 DEBUG> Parsing sharding info from message options {'COMPRESSION': ['lz4', 'snappy'], 'SCYLLA_LWT_ADD_METADATA_MARK': ['LWT_OPTIMIZATION_META_BIT_MASK=2147483648'], 'SCYLLA_NR_SHARDS': ['2'], 'SCYLLA_PARTITIONER': ['org.apache.cassandra.dht.Murmur3Partitioner'], 'SCYLLA_RATE_LIMIT_ERROR': ['ERROR_CODE=61440'], 'SCYLLA_SHARD': ['1'], 'SCYLLA_SHARDING_ALGORITHM': ['biased-token-round-robin'], 'SCYLLA_SHARDING_IGNORE_MSB': ['12'], 'TABLETS_ROUTING_V1': ['']}
10:04:15.708 DEBUG> Received options response on new connection (281470371266320) from 127.167.175.35:9042
10:04:15.708 DEBUG> No available compression types supported on both ends. locally supported: odict_keys([]). remotely supported: ['lz4', 'snappy']
10:04:15.708 DEBUG> Sending StartupMessage on <CustomConnection(281470371266320) 127.167.175.35:9042>
10:04:15.708 DEBUG> Sent StartupMessage on <CustomConnection(281470371266320) 127.167.175.35:9042>
10:04:15.715 DEBUG> Got ReadyMessage on new connection (281470371266320) from 127.167.175.35:9042
10:04:15.716 DEBUG> Received a connection 281470371266320 for shard_id=1 on host 127.167.175.35:9042
10:04:15.716 DEBUG> Pool for host 127.167.175.35:9042 is in shutdown, closing the new connection (281470371266320)
10:04:15.716 DEBUG> Closing connection (281470371266320) to 127.167.175.35:9042
10:04:15.716 DEBUG> Closed socket to 127.167.175.35:9042
10:04:15.720 DEBUG> Got ReadyMessage on new connection (281470371261456) from 127.167.175.35:9042
10:04:15.724 DEBUG> Received a connection 281470371261456 for shard_id=0 on host 127.167.175.35:9042
10:04:15.725 DEBUG> Putting a connection 281470371261456 to shard 0 to the excess pool of host 127.167.175.35:9042

We can clearly see that the driver managed to reconnect

10:04:15.658 INFO> Successful reconnection to 127.167.175.35:9042, marking node up if it isn't already
10:04:15.658 DEBUG> Waiting to acquire lock for handling up status of node 127.167.175.35:9042
10:04:15.659 DEBUG> Starting to handle up status of node 127.167.175.35:9042
10:04:15.659 DEBUG> Got ReadyMessage on new connection (281470374861840) from 127.167.175.35:9042

but then, for some reason, drops connections...

10:04:15.661 DEBUG> Attempting to open new connection pools for host 127.167.175.35:9042
10:04:15.661 DEBUG> Signalling to load balancing policies that host 127.167.175.35:9042 is up
10:04:15.661 DEBUG> Closing connection (281470374907600) to 127.167.175.35:9042

One curious thing I noticed is this:

10:04:15.659 DEBUG> Now that host 127.167.175.35:9042 is up, cancelling the reconnection handler

this suggest that there is some kind of reconnection logic -- perhaps reacting to events? -- and that logic is supposed to be cancelled. However, reconnection still happens.

I have a suspicion that the driver is perhaps reacting to notifications from Scylla. Different nodes may be noticing that our node is UP at different times, and sending notifications to the driver (additionally, these notifications can also be delayed for whatever reason). Whenever the driver receives such a notification, this triggers reconnection.

But -- perhaps that reconnection logic should be cancelled when the driver first manages to reconnect? The message above suggests such mechanism being already in place, but apparently it's not, or it doesn't work. Or, perhaps upon receiving such notification, the driver should first check that the existing connections (if present) are alive (exchange a packet back and forth?), and only if they're not, attempt reconnection?

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions