Skip to content

Python Driver fails to update existing PreparedStatements after table alter #346

@kbr-scylla

Description

@kbr-scylla

The following test written for Scylla's test.py framework fails:

import logging
import pytest
from test.pylib.manager_client import ManagerClient
logger = logging.getLogger(__name__)

@pytest.mark.asyncio
async def test_alter_prepared(manager: ManagerClient) -> None:
    s1 = await manager.server_add()
    cql, _ = await manager.get_ready_cql([s1])

    logger.info("ks")
    await cql.run_async("create keyspace ks with replication = {'class': 'SimpleStrategy', 'replication_factor': 1}")
    await cql.run_async("create table ks.t (pk int primary key, x int)")

    stmt = cql.prepare("update ks.t set x = ? where pk = ?")
    await cql.run_async(stmt, [0, 0])

    await cql.run_async("alter table ks.t alter x type blob")
    #stmt = cql.prepare("update ks.t set x = ? where pk = ?")
    #await cql.run_async(stmt, [0, 0])
    await cql.run_async(stmt, [b'', 0])

with:

E                   TypeError: Received an argument of invalid type for column "x". Expected: <class 'cassandra.cqltypes.Int32Type'>, Got: <class 'bytes'>; (required argument is not an integer)

In this case (I investigated), the Python driver does not attempt a statement reprepare at all.
This is problem number 1.

If we uncomment:

    await cql.run_async(stmt, [0, 0])

(second-to-last statement),
this triggers a reprepare (which I checked by adding some logs to the driver), because driver sends a request and gets a "query not prepared" response from Scylla. This statement passes. However, the following statement:

    await cql.run_async(stmt, [b'', 0])

still fails, even after repreparation.
This is problem number 2.

If we uncomment the explicit

    stmt = cql.prepare("update ks.t set x = ? where pk = ?")

(after the alter),
then the last statement passes (the one which sends [b'', 0]), whether or not we uncomment the the previous statement (the one which sends [0, 0]).

I found an easy way to solve problem number 2, by adjusting the reprepare code to update column_metadata inside the PreparedStatement object (following the logic used when PreparedStatement is first created by prepare: column_metadata is set to the response bind_metadata):

diff --git a/cassandra/cluster.py b/cassandra/cluster.py
index 5f2669c0..aaff3432 100644
--- a/cassandra/cluster.py
+++ b/cassandra/cluster.py
@@ -4963,6 +4963,7 @@ class ResponseFuture(object):
                             )
                         ))
                     self.prepared_statement.result_metadata = response.column_metadata
+                    self.prepared_statement.column_metadata = response.bind_metadata
                     new_metadata_id = response.result_metadata_id
                     if new_metadata_id is not None:
                         self.prepared_statement.result_metadata_id = new_metadata_id

(this is in _execute_after_prepare function in cassandra/cluster.py).

(BTW. the names are weird, yes)

However I don't know how to solve problem 1. In problem 1, the driver does not even attempt a reprepare, because it fails on serialization stage before even sending the request, due to outdated column_metadata which it apparently uses to do the serialization.


The problems have different symptoms (although the root causes are the same), if instead changing the type of a column using alter ks.t alter x type ..., we introduce a user-defined type, and then add a column to this type. Consider this test:

import logging
import pytest
from dataclasses import dataclass
from test.pylib.manager_client import ManagerClient

logger = logging.getLogger(__name__)

@dataclass
class Typ1:
    a: int

@dataclass
class Typ2:
    a: int
    b: int

@pytest.mark.asyncio
async def test_alter_prepared(manager: ManagerClient) -> None:
    s1 = await manager.server_add()
    cql, _ = await manager.get_ready_cql([s1])

    logger.info("ks")
    await cql.run_async("create keyspace ks with replication = {'class': 'SimpleStrategy', 'replication_factor': 1}")
    await cql.run_async("create type ks.typ (a int)")
    await cql.run_async("create table ks.t (pk int primary key, x frozen<typ>)")

    stmt = cql.prepare("update ks.t set x = ? where pk = ?")
    await cql.run_async(stmt, [Typ1(0), 0])

    await cql.run_async("alter type ks.typ add b int")
    #stmt = cql.prepare("update ks.t set x = ? where pk = ?")
    await cql.run_async(stmt, [Typ2(0, 0), 1])
    await cql.run_async(stmt, [Typ2(0, 0), 2])

    rs = await cql.run_async("select pk, x from ks.t")
    logger.info(list(rs))

There are no failures. However, the result is:
[Row(pk=1, x=typ(a=0, b=None)), Row(pk=0, x=typ(a=0, b=None)), Row(pk=2, x=typ(a=0, b=None))]

So b is None for pk=1 and pk=2, even though we bound Typ2(0, 0) when updating those keys.

After my column_metadata fix, the result changes to:
[Row(pk=1, x=typ(a=0, b=None)), Row(pk=0, x=typ(a=0, b=None)), Row(pk=2, x=typ(a=0, b=0))]

So for pk=1 the result is still None. Even though this statement causes a repreparation; the driver should in theory resend the query with updated column_metadata so b should get updated for pk=1 too.

But for pk=2 the result is correct -- apparently this time the updated column_metadata helped.

Uncommenting the explicit prepare also helps.


So is this a bug?

Well, at least Rust driver does provide a much better user experience. This (executed against pre-existing single-node cluster):

use anyhow::Result;
use scylla::{SessionBuilder};
use std::env;

use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let uri = env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
    let session = SessionBuilder::new() .known_node(uri) .build() .await?;
    let session = Arc::new(session);

    session.query("DROP KEYSPACE IF EXISTS ks", &[],).await?;
    session.query("CREATE KEYSPACE IF NOT EXISTS ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}", &[],).await?;
    session.query("CREATE TABLE ks.t ( a int primary key, b int)", &[],).await?;

    let insert = Arc::new(session.prepare("UPDATE ks.t SET b = ?  WHERE a = ?  ").await?);

    if let Err(e) = session.execute(&insert, (0 as i32, 0 as i32)).await {
        println!("{}", e);
        std::process::exit(1);
    }

    session.query("ALTER TABLE ks.t alter b type blob", &[],).await?;

    if let Err(e) = session.execute(&insert, (Vec::<u8>::new(), 1 as i32)).await {
        println!("{}", e);
        std::process::exit(1);
    }

    std::process::exit(0);
}

works out of the box, and gives

cqlsh> select * from ks.t;

 a | b
---+------------
 1 |         0x
 0 | 0x00000000

(2 rows)

looking at the code, apparently Rust driver does not have a corresponding structure to Python driver's column_metadata / bind_metadata to be used for serialization; instead, it seems to use the types coming with the tuple. And the first statement after the ALTER reprepares, but does not seem to update the insert object, which apparently is not necessary for this driver.

Similarly the experience is better with UDTs:

use anyhow::Result;
use scylla::{SessionBuilder};
use std::env;

use std::sync::Arc;

use scylla::macros::{FromUserType, IntoUserType};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let uri = env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
    let session = SessionBuilder::new() .known_node(uri) .build() .await?;
    let session = Arc::new(session);

    session.query("DROP KEYSPACE IF EXISTS ks", &[],).await?;
    session.query("CREATE KEYSPACE IF NOT EXISTS ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}", &[],).await?;
    session.query("CREATE TYPE ks.typ (a int,); ", &[],).await?;
    session.query("CREATE TABLE ks.t ( a int primary key, x frozen<typ>)", &[],).await?;

    #[derive(Debug, IntoUserType, FromUserType, Clone)]
    struct Typ1 {
        a: i32,
    }

    #[derive(Debug, IntoUserType, FromUserType, Clone)]
    struct Typ2 {
        a: i32,
        b: i32,
    }

    let insert = Arc::new(session.prepare("UPDATE ks.t SET x = ?  WHERE a = ?  ").await?);

    if let Err(e) = session.execute(&insert, (&Typ1{a: 0}, 0 as i32)).await {
        println!("{}", e);
        std::process::exit(1);
    }

    session.query("ALTER type ks.typ add b int", &[],).await?;

    if let Err(e) = session.execute(&insert, (&Typ2{a: 0, b: 0}, 1 as i32)).await {
        println!("{}", e);
        std::process::exit(1);
    }

    std::process::exit(0);
}

result:

cqlsh> select * from ks.t;

 a | x
---+-----------------
 1 |    {a: 0, b: 0}
 0 | {a: 0, b: null}

(2 rows)

cc @Lorak-mmk

Metadata

Metadata

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions