Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2998,9 +2998,7 @@ def _create_response_future(self, query, parameters, trace, custom_payload,
message = ExecuteMessage(
prepared_statement.query_id, query.values, cl,
serial_cl, fetch_size, paging_state, timestamp,
skip_meta=bool(prepared_statement.result_metadata),
continuous_paging_options=continuous_paging_options,
result_metadata_id=prepared_statement.result_metadata_id)
continuous_paging_options=continuous_paging_options)
elif isinstance(query, BatchStatement):
if self._protocol_version < 2:
raise UnsupportedOperation(
Expand Down Expand Up @@ -4627,6 +4625,16 @@ def _query(self, host, message=None, cb=None):
self._connection = connection
result_meta = self.prepared_statement.result_metadata if self.prepared_statement else []

if self.prepared_statement and isinstance(message, ExecuteMessage):
has_result_metadata_id = self.prepared_statement.result_metadata_id is not None
has_result_metadata = bool(self.prepared_statement.result_metadata)
use_metadata_id = has_result_metadata_id and has_result_metadata and (
ProtocolVersion.uses_prepared_metadata(connection.protocol_version)
or connection.features.use_metadata_id
)
message.skip_meta = use_metadata_id
message.result_metadata_id = self.prepared_statement.result_metadata_id if use_metadata_id else None
Comment on lines +4629 to +4636
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High: this gate enables skip_meta for any prepared statement with a non-None result_metadata_id, but some prepared statements still legitimately have no cached result metadata. The repo already has that case in tests/integration/standard/test_prepared_statements.py (_test_updated_conditional asserts prepared_statement.result_metadata is None while result_metadata_id stays set for prepared conditional/LWT statements).

With SCYLLA_USE_METADATA_ID negotiated, this branch will set skip_meta=True and send that metadata id anyway. On Scylla, if the request/response metadata ids match, the server keeps NO_METADATA on the EXECUTE response instead of forcing metadata back, so the driver reaches recv_results_rows() with neither response metadata nor cached metadata to decode against. That turns into a real decode failure, not just a missed optimization.

I think this needs one more safety condition: only enable skip_meta when the prepared statement has usable cached result metadata, and keep it disabled for statements prepared with NO_METADATA / empty result metadata.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Added has_result_metadata = bool(self.prepared_statement.result_metadata) as an additional condition in the use_metadata_id gate — skip_meta is now only enabled when the prepared statement has both a result_metadata_id and usable cached result metadata. LWT/conditional statements (INSERT ... IF NOT EXISTS etc.) have result_metadata_id set but result_metadata = None (the PREPARE response carries NO_METADATA for the result columns), so they correctly fall through to skip_meta=False and the server always sends full metadata.

On the test side: added test_query_no_skip_meta_when_result_metadata_is_none to directly cover this case, and corrected two existing _query tests (test_query_sets_skip_meta_with_scylla_extension, test_query_sets_skip_meta_for_protocol_v5) that were using an empty list [] for result_metadata — those were accidentally falsy and would have hidden this regression going forward.


if cb is None:
cb = partial(self._set_result, host, connection, pool)

Expand Down Expand Up @@ -4783,6 +4791,24 @@ def _set_result(self, host, connection, pool, response):
self._paging_state = response.paging_state
self._col_names = response.column_names
self._col_types = response.column_types
new_result_metadata_id = getattr(response, 'result_metadata_id', None)
if self.prepared_statement and new_result_metadata_id is not None:
if response.column_metadata:
# Write result_metadata before result_metadata_id intentionally:
# a concurrent reader that still sees the old metadata_id will
# ask the server for full metadata and recover safely; a reader
# that sees the new metadata_id together with the new metadata
# is immediately correct. The opposite write order could expose
# a window where a reader uses a new metadata_id with stale metadata.
self.prepared_statement.result_metadata = response.column_metadata
else:
log.warning(
"Server sent a new result_metadata_id but no column metadata "
"for prepared statement %r. The cached column metadata will not "
"be updated; only result_metadata_id is refreshed.",
getattr(self.prepared_statement, 'query_id', None)
)
self.prepared_statement.result_metadata_id = new_result_metadata_id
if getattr(self.message, 'continuous_paging_options', None):
self._handle_continuous_paging_first_response(connection, response)
else:
Expand Down
7 changes: 6 additions & 1 deletion cassandra/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,9 @@ def _write_query_params(self, f, protocol_version):
if self.timestamp is not None:
flags |= _PROTOCOL_TIMESTAMP_FLAG

if self.skip_meta:
flags |= _SKIP_METADATA_FLAG

if self.keyspace is not None:
if ProtocolVersion.uses_keyspace_flag(protocol_version):
flags |= _WITH_KEYSPACE_FLAG
Expand Down Expand Up @@ -642,6 +645,8 @@ def send_body(self, f, protocol_version):
write_string(f, self.query_id)
if ProtocolVersion.uses_prepared_metadata(protocol_version):
write_string(f, self.result_metadata_id)
elif self.result_metadata_id is not None:
write_string(f, self.result_metadata_id)
Comment on lines +648 to +649
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExecuteMessage.send_body() now writes result_metadata_id for protocol versions that don’t use standard prepared-metadata (pre-v5) whenever self.result_metadata_id is non-None. For pre-v5 this extra field is only valid when SCYLLA_USE_METADATA_ID was negotiated on that connection; otherwise it changes the wire layout (the server will interpret the metadata id bytes as the query parameters) and the request will fail.

To make this safe, ensure the decision to include result_metadata_id is gated by the negotiated feature (e.g., add an explicit use_metadata_id/send_result_metadata_id flag on the message that the caller sets based on connection.features.use_metadata_id, or guarantee centrally that result_metadata_id is cleared unless the extension is active for that connection).

Suggested change
elif self.result_metadata_id is not None:
write_string(f, self.result_metadata_id)

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. The elif self.result_metadata_id is not None path in send_body is now only reached when the caller explicitly set the field — which only happens in _query() after confirming connection.features.use_metadata_id (or CQL v5). For any connection that didn't negotiate the extension, result_metadata_id remains None and the branch is never taken, so the wire layout is unaffected.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is still a problem here. If the result id feature is negotiated on the connection, then you need to ALWAYS send some result metadata id in EXECUTE. Skipping the write_string will result in a protocol error.
Your use_metadata_id may be False even if extension was negotiated, if the server decided to skip the metadata in PREPARED response. In such case, you'll skip writing the id here, and encounter protocol error.

Even if you fix this specific case, there is still possibility of mixed cluster, with some nodes supporting the extension. In that case result_metadata_id will be None, and if you send to a node that has the extension negotiated, you'll again not send the id and encounter protocol error.

To sum up: this serialization here should check if feature is negotiated, and base sending this field only on that.

self._write_query_params(f, protocol_version)


Expand Down Expand Up @@ -745,7 +750,7 @@ def decode_row(row):

def recv_results_prepared(self, f, protocol_version, protocol_features, user_type_map):
self.query_id = read_binary_string(f)
if ProtocolVersion.uses_prepared_metadata(protocol_version):
if ProtocolVersion.uses_prepared_metadata(protocol_version) or protocol_features.use_metadata_id:
self.result_metadata_id = read_binary_string(f)
else:
self.result_metadata_id = None
Expand Down
16 changes: 14 additions & 2 deletions cassandra/protocol_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,34 @@
LWT_OPTIMIZATION_META_BIT_MASK = "LWT_OPTIMIZATION_META_BIT_MASK"
RATE_LIMIT_ERROR_EXTENSION = "SCYLLA_RATE_LIMIT_ERROR"
TABLETS_ROUTING_V1 = "TABLETS_ROUTING_V1"
USE_METADATA_ID = "SCYLLA_USE_METADATA_ID"

class ProtocolFeatures(object):
rate_limit_error = None
shard_id = 0
sharding_info = None
tablets_routing_v1 = False
lwt_info = None
use_metadata_id = False

def __init__(self, rate_limit_error=None, shard_id=0, sharding_info=None, tablets_routing_v1=False, lwt_info=None):
def __init__(self, rate_limit_error=None, shard_id=0, sharding_info=None, tablets_routing_v1=False, lwt_info=None,
use_metadata_id=False):
self.rate_limit_error = rate_limit_error
self.shard_id = shard_id
self.sharding_info = sharding_info
self.tablets_routing_v1 = tablets_routing_v1
self.lwt_info = lwt_info
self.use_metadata_id = use_metadata_id

@staticmethod
def parse_from_supported(supported):
rate_limit_error = ProtocolFeatures.maybe_parse_rate_limit_error(supported)
shard_id, sharding_info = ProtocolFeatures.parse_sharding_info(supported)
tablets_routing_v1 = ProtocolFeatures.parse_tablets_info(supported)
lwt_info = ProtocolFeatures.parse_lwt_info(supported)
return ProtocolFeatures(rate_limit_error, shard_id, sharding_info, tablets_routing_v1, lwt_info)
use_metadata_id = ProtocolFeatures.parse_use_metadata_id(supported)
return ProtocolFeatures(rate_limit_error, shard_id, sharding_info, tablets_routing_v1, lwt_info,
use_metadata_id)

@staticmethod
def maybe_parse_rate_limit_error(supported):
Expand All @@ -57,6 +63,8 @@ def add_startup_options(self, options):
options[TABLETS_ROUTING_V1] = ""
if self.lwt_info is not None:
options[LWT_ADD_METADATA_MARK] = str(self.lwt_info.lwt_meta_bit_mask)
if self.use_metadata_id:
options[USE_METADATA_ID] = ""

@staticmethod
def parse_sharding_info(options):
Expand All @@ -81,6 +89,10 @@ def parse_sharding_info(options):
def parse_tablets_info(options):
return TABLETS_ROUTING_V1 in options

@staticmethod
def parse_use_metadata_id(options):
return USE_METADATA_ID in options

@staticmethod
def parse_lwt_info(options):
value_list = options.get(LWT_ADD_METADATA_MARK, [None])
Expand Down
40 changes: 40 additions & 0 deletions docs/scylla-specific.rst
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,43 @@ https://github.com/scylladb/scylladb/blob/master/docs/dev/protocol-extensions.md

Details on the sending tablet information to the drivers
https://github.com/scylladb/scylladb/blob/master/docs/dev/protocol-extensions.md#sending-tablet-info-to-the-drivers


Prepared Statement Metadata Caching (``SCYLLA_USE_METADATA_ID``)
----------------------------------------------------------------

When executing prepared SELECT statements, the driver normally requests the server
to skip sending full result metadata with each response (``skip_meta`` optimization),
relying on the metadata cached from the initial ``PREPARE`` call. However, if the
table schema changes after a statement is prepared (e.g., a column is added, removed,
or its type is altered), this cached metadata becomes stale — leading to decoding
errors or incorrect data.

ScyllaDB solves this by backporting the ``metadata_id`` mechanism from CQL native
protocol v5 as a v4 extension: ``SCYLLA_USE_METADATA_ID``. When this extension is
negotiated, the server includes a hash of the result metadata in the ``PREPARE``
response. The driver sends this hash back with every ``EXECUTE`` request. If the
schema has changed, the server sets the ``METADATA_CHANGED`` flag and returns the
new metadata hash together with the updated column definitions. The driver
automatically updates its cache and uses the new metadata to decode the current
response — all transparently, with no application code change required.

**Behaviour summary:**

- Automatically negotiated at connection time when the ScyllaDB node supports it.
- ``skip_meta`` is enabled (metadata omitted from EXECUTE responses) only when it
is safe: the connection must have negotiated ``SCYLLA_USE_METADATA_ID`` (or use
CQL v5), *and* the prepared statement must carry a ``result_metadata_id`` obtained
from PREPARE.
- When a schema change is detected by the server, the driver refreshes both the
cached column metadata and the metadata hash for that prepared statement so that
all subsequent executions benefit immediately.
- Statements prepared before the extension was negotiated (e.g., during a rolling
upgrade) retain ``result_metadata_id=None`` and fall back to always requesting
full metadata, which is the safest option.

**Current scope:** schema-change detection is implemented for SELECT statements.
UPDATE/INSERT coverage is planned in a separate effort.

For full protocol details see the ScyllaDB CQL extensions documentation:
https://opensource.docs.scylladb.com/stable/cql/cql-extensions.html
126 changes: 125 additions & 1 deletion tests/unit/test_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import io
import struct
import unittest

from unittest.mock import Mock
Expand All @@ -21,8 +23,11 @@
PrepareMessage, QueryMessage, ExecuteMessage, UnsupportedOperation,
_PAGING_OPTIONS_FLAG, _WITH_SERIAL_CONSISTENCY_FLAG,
_PAGE_SIZE_FLAG, _WITH_PAGING_STATE_FLAG,
BatchMessage
_SKIP_METADATA_FLAG,
BatchMessage, ResultMessage,
RESULT_KIND_ROWS
)
from cassandra.protocol_features import ProtocolFeatures
from cassandra.query import BatchType
from cassandra.marshal import uint32_unpack
from cassandra.cluster import ContinuousPagingOptions
Expand Down Expand Up @@ -68,6 +73,125 @@ def test_execute_message(self):
(b'\x00\x04',),
(b'\x00\x00\x00\x01',), (b'\x00\x00',)])

def test_execute_message_skip_meta_flag(self):
"""skip_meta=True must set _SKIP_METADATA_FLAG (0x02) in the flags byte."""
message = ExecuteMessage('1', [], 4, skip_meta=True)
mock_io = Mock()

message.send_body(mock_io, 4)
# flags byte should be VALUES_FLAG | SKIP_METADATA_FLAG = 0x01 | 0x02 = 0x03
self._check_calls(mock_io, [(b'\x00\x01',), (b'1',), (b'\x00\x04',), (b'\x03',), (b'\x00\x00',)])

def test_execute_message_scylla_metadata_id_v4(self):
"""result_metadata_id should be written on protocol v4 when set (Scylla extension)."""
message = ExecuteMessage('1', [], 4)
message.result_metadata_id = b'foo'
mock_io = Mock()

message.send_body(mock_io, 4)
# metadata_id written before query params (same position as v5)
self._check_calls(mock_io, [(b'\x00\x01',), (b'1',),
(b'\x00\x03',), (b'foo',),
(b'\x00\x04',), (b'\x01',), (b'\x00\x00',)])

def test_recv_results_prepared_scylla_extension_reads_metadata_id(self):
"""
When use_metadata_id is True (Scylla extension), result_metadata_id must be
read from the PREPARE response even for protocol v4.
"""
# Build a minimal valid PREPARE response binary (no bind/result columns):
# query_id: short(2) + b'ab'
# result_metadata_id: short(3) + b'xyz' <-- only present when extension active
# prepared flags: int(1) = global_tables_spec
# colcount: int(0)
# num_pk_indexes: int(0)
# ksname: short(2) + b'ks'
# cfname: short(2) + b'tb'
# result flags: int(4) = no_metadata
# result colcount: int(0)
buf = io.BytesIO(
struct.pack('>H', 2) + b'ab' # query_id
+ struct.pack('>H', 3) + b'xyz' # result_metadata_id
+ struct.pack('>i', 1) # prepared flags: global_tables_spec
+ struct.pack('>i', 0) # colcount = 0
+ struct.pack('>i', 0) # num_pk_indexes = 0
+ struct.pack('>H', 2) + b'ks' # ksname
+ struct.pack('>H', 2) + b'tb' # cfname
+ struct.pack('>i', 4) # result flags: no_metadata
+ struct.pack('>i', 0) # result colcount = 0
)

features_with_extension = ProtocolFeatures(use_metadata_id=True)
msg = ResultMessage(kind=4) # RESULT_KIND_PREPARED = 4
msg.recv_results_prepared(buf, protocol_version=4,
protocol_features=features_with_extension,
user_type_map={})
assert msg.query_id == b'ab'
assert msg.result_metadata_id == b'xyz'

def test_recv_results_prepared_no_extension_skips_metadata_id(self):
"""
Without use_metadata_id, result_metadata_id must NOT be read on protocol v4.
The buffer must NOT contain a metadata_id field.
"""
buf = io.BytesIO(
struct.pack('>H', 2) + b'ab' # query_id
# no result_metadata_id
+ struct.pack('>i', 1) # prepared flags: global_tables_spec
+ struct.pack('>i', 0) # colcount = 0
+ struct.pack('>i', 0) # num_pk_indexes = 0
+ struct.pack('>H', 2) + b'ks' # ksname
+ struct.pack('>H', 2) + b'tb' # cfname
+ struct.pack('>i', 4) # result flags: no_metadata
+ struct.pack('>i', 0) # result colcount = 0
)

features_without_extension = ProtocolFeatures(use_metadata_id=False)
msg = ResultMessage(kind=4)
msg.recv_results_prepared(buf, protocol_version=4,
protocol_features=features_without_extension,
user_type_map={})
assert msg.query_id == b'ab'
assert msg.result_metadata_id is None

def test_recv_results_metadata_changed_flag(self):
"""
When _METADATA_ID_FLAG (0x0008) is set in a ROWS result,
recv_results_metadata must read and store the new result_metadata_id
sent by the server (METADATA_CHANGED signal), and still populate
column_metadata normally.
"""
# Wire layout for a ROWS result with METADATA_CHANGED:
# flags: int(0x0008) = _METADATA_ID_FLAG
# colcount: int(0)
# result_metadata_id: short(4) + b'new1'
# (no columns — colcount=0 — to keep the buffer minimal)
buf = io.BytesIO(
struct.pack('>i', 0x0008) # flags: METADATA_ID_FLAG
+ struct.pack('>i', 0) # colcount = 0
+ struct.pack('>H', 4) + b'new1' # result_metadata_id = b'new1'
)
msg = ResultMessage(kind=RESULT_KIND_ROWS)
msg.recv_results_metadata(buf, user_type_map={})
assert msg.result_metadata_id == b'new1'
assert msg.column_metadata == []

def test_recv_results_metadata_no_metadata_flag_skips_metadata_id(self):
"""
When _NO_METADATA_FLAG (0x0004) is set, recv_results_metadata returns
early and must NOT read or set result_metadata_id, even if the caller
mistakenly sets _METADATA_ID_FLAG alongside it.
"""
# flags = _NO_METADATA_FLAG (0x0004), colcount = 0
buf = io.BytesIO(
struct.pack('>i', 0x0004) # flags: NO_METADATA
+ struct.pack('>i', 0) # colcount = 0
)
msg = ResultMessage(kind=RESULT_KIND_ROWS)
msg.recv_results_metadata(buf, user_type_map={})
assert not hasattr(msg, 'result_metadata_id') or msg.result_metadata_id is None
assert not hasattr(msg, 'column_metadata') or msg.column_metadata is None

def test_query_message(self):
"""
Test to check the appropriate calls are made
Expand Down
35 changes: 35 additions & 0 deletions tests/unit/test_protocol_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,38 @@ class OptionsHolder(object):
assert protocol_features.rate_limit_error == 123
assert protocol_features.shard_id == 0
assert protocol_features.sharding_info is None

def test_use_metadata_id_parsing(self):
"""
Test that SCYLLA_USE_METADATA_ID is parsed from SUPPORTED options.
"""
options = {'SCYLLA_USE_METADATA_ID': ['']}
protocol_features = ProtocolFeatures.parse_from_supported(options)
assert protocol_features.use_metadata_id is True

def test_use_metadata_id_missing(self):
"""
Test that use_metadata_id is False when SCYLLA_USE_METADATA_ID is absent.
"""
options = {'SCYLLA_RATE_LIMIT_ERROR': ['ERROR_CODE=1']}
protocol_features = ProtocolFeatures.parse_from_supported(options)
assert protocol_features.use_metadata_id is False

def test_use_metadata_id_startup_options(self):
"""
Test that SCYLLA_USE_METADATA_ID is included in STARTUP options when negotiated.
"""
options = {'SCYLLA_USE_METADATA_ID': ['']}
protocol_features = ProtocolFeatures.parse_from_supported(options)
startup = {}
protocol_features.add_startup_options(startup)
assert 'SCYLLA_USE_METADATA_ID' in startup

def test_use_metadata_id_not_in_startup_when_not_negotiated(self):
"""
Test that SCYLLA_USE_METADATA_ID is NOT included in STARTUP when not negotiated.
"""
protocol_features = ProtocolFeatures.parse_from_supported({})
startup = {}
protocol_features.add_startup_options(startup)
assert 'SCYLLA_USE_METADATA_ID' not in startup
Loading
Loading