Skip to content

Commit 6a6ce0a

Browse files
kartalbasjulien-duponchelle
authored andcommitted
Add MySQL 5.7 column name support
- Query INFORMATION_SCHEMA for column names when not in binlog - Module-level cache to prevent repeated queries - Opt-in via use_column_name_cache parameter - Handle both dict and tuple cursor types - Backward compatible (disabled by default)
1 parent 9c3bd9c commit 6a6ce0a

File tree

5 files changed

+84
-2
lines changed

5 files changed

+84
-2
lines changed

pymysqlreplication/binlogstream.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ def __init__(
188188
ignore_decode_errors=False,
189189
verify_checksum=False,
190190
enable_logging=True,
191+
use_column_name_cache=False,
191192
):
192193
"""
193194
Attributes:
@@ -230,6 +231,8 @@ def __init__(
230231
verify_checksum: If true, verify events read from the binary log by examining checksums.
231232
enable_logging: When set to True, logs various details helpful for debugging and monitoring
232233
When set to False, logging is disabled to enhance performance.
234+
use_column_name_cache: If true, enables caching of column names from INFORMATION_SCHEMA
235+
for MySQL 5.7 compatibility when binlog metadata is missing. Default is False.
233236
"""
234237

235238
self.__connection_settings = connection_settings
@@ -254,6 +257,8 @@ def __init__(
254257
self.__ignore_decode_errors = ignore_decode_errors
255258
self.__verify_checksum = verify_checksum
256259
self.__optional_meta_data = False
260+
self.__enable_logging = enable_logging
261+
self.__use_column_name_cache = use_column_name_cache
257262

258263
# We can't filter on packet level TABLE_MAP and rotate event because
259264
# we need them for handling other operations
@@ -630,6 +635,8 @@ def fetchone(self):
630635
self.__ignore_decode_errors,
631636
self.__verify_checksum,
632637
self.__optional_meta_data,
638+
self.__enable_logging,
639+
self.__use_column_name_cache,
633640
)
634641

635642
if binlog_event.event_type == ROTATE_EVENT:

pymysqlreplication/event.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ def __init__(
2828
ignore_decode_errors=False,
2929
verify_checksum=False,
3030
optional_meta_data=False,
31+
enable_logging=False,
32+
use_column_name_cache=False,
3133
):
3234
self.packet = from_packet
3335
self.table_map = table_map

pymysqlreplication/packet.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ def __init__(
7474
ignore_decode_errors,
7575
verify_checksum,
7676
optional_meta_data,
77+
enable_logging,
78+
use_column_name_cache=False,
7779
):
7880
# -1 because we ignore the ok byte
7981
self.read_bytes = 0
@@ -82,6 +84,8 @@ def __init__(
8284

8385
self.packet = from_packet
8486
self.charset = ctl_connection.charset
87+
self.enable_logging = enable_logging
88+
self.use_column_name_cache = use_column_name_cache
8589

8690
# OK value
8791
# timestamp
@@ -127,6 +131,8 @@ def __init__(
127131
ignore_decode_errors=ignore_decode_errors,
128132
verify_checksum=verify_checksum,
129133
optional_meta_data=optional_meta_data,
134+
enable_logging=enable_logging,
135+
use_column_name_cache=use_column_name_cache,
130136
)
131137
if not self.event._processed:
132138
self.event = None

pymysqlreplication/row_event.py

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import struct
22
import decimal
33
import datetime
4+
import logging
45

56
from pymysql.charset import charset_by_name
67
from enum import Enum
@@ -15,6 +16,10 @@
1516
from .bitmap import BitCount, BitGet
1617

1718

19+
20+
# MySQL 5.7 compatibility: Cache for INFORMATION_SCHEMA column names
21+
_COLUMN_NAME_CACHE = {}
22+
1823
class RowsEvent(BinLogEvent):
1924
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
2025
super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
@@ -746,6 +751,8 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
746751
self.__ignored_schemas = kwargs["ignored_schemas"]
747752
self.__freeze_schema = kwargs["freeze_schema"]
748753
self.__optional_meta_data = kwargs["optional_meta_data"]
754+
self.__enable_logging = kwargs.get("enable_logging", False)
755+
self.__use_column_name_cache = kwargs.get("use_column_name_cache", False)
749756
# Post-Header
750757
self.table_id = self._read_table_id()
751758

@@ -909,12 +916,70 @@ def _get_optional_meta_data(self):
909916

910917
return optional_metadata
911918

919+
920+
def _fetch_column_names_from_schema(self):
921+
"""
922+
Fetch column names from INFORMATION_SCHEMA for MySQL 5.7 compatibility.
923+
924+
Only executes if use_column_name_cache=True is enabled.
925+
Uses module-level cache to avoid repeated queries.
926+
927+
Returns:
928+
list: Column names in ORDINAL_POSITION order, or empty list
929+
"""
930+
# Only fetch if explicitly enabled (opt-in feature)
931+
if not self.__use_column_name_cache:
932+
return []
933+
934+
cache_key = f"{self.schema}.{self.table}"
935+
936+
# Check cache first
937+
if cache_key in _COLUMN_NAME_CACHE:
938+
return _COLUMN_NAME_CACHE[cache_key]
939+
940+
try:
941+
query = """
942+
SELECT COLUMN_NAME
943+
FROM INFORMATION_SCHEMA.COLUMNS
944+
WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s
945+
ORDER BY ORDINAL_POSITION
946+
"""
947+
cursor = self._ctl_connection.cursor()
948+
cursor.execute(query, (self.schema, self.table))
949+
rows = cursor.fetchall()
950+
# Handle both tuple and dict cursor results
951+
if rows and isinstance(rows[0], dict):
952+
column_names = [row['COLUMN_NAME'] for row in rows]
953+
else:
954+
column_names = [row[0] for row in rows]
955+
cursor.close()
956+
957+
# Cache result
958+
_COLUMN_NAME_CACHE[cache_key] = column_names
959+
960+
if self.__enable_logging and column_names:
961+
logging.info(f"Cached column names for {cache_key}: {len(column_names)} columns")
962+
963+
return column_names
964+
except Exception as e:
965+
if self.__enable_logging:
966+
logging.warning(f"Failed to fetch column names for {cache_key}: {type(e).__name__}: {e}")
967+
# Cache empty result to avoid retry spam
968+
_COLUMN_NAME_CACHE[cache_key] = []
969+
return []
970+
912971
def _sync_column_info(self):
913972
if not self.__optional_meta_data:
914-
# If optional_meta_data is False Do not sync Event Time Column Schemas
973+
column_names = self._fetch_column_names_from_schema()
974+
if column_names and len(column_names) == self.column_count:
975+
for column_idx in range(self.column_count):
976+
self.columns[column_idx].name = column_names[column_idx]
915977
return
916978
if len(self.optional_metadata.column_name_list) == 0:
917-
# May Be Now BINLOG_ROW_METADATA = FULL But Before Action BINLOG_ROW_METADATA Mode = MINIMAL
979+
column_names = self._fetch_column_names_from_schema()
980+
if column_names and len(column_names) == self.column_count:
981+
for column_idx in range(self.column_count):
982+
self.columns[column_idx].name = column_names[column_idx]
918983
return
919984
charset_pos = 0
920985
enum_or_set_pos = 0

pymysqlreplication/tests/test_basic.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,8 @@ def create_binlog_packet_wrapper(pkt):
619619
self.stream._BinLogStreamReader__ignore_decode_errors,
620620
self.stream._BinLogStreamReader__verify_checksum,
621621
self.stream._BinLogStreamReader__optional_meta_data,
622+
self.stream._BinLogStreamReader__enable_logging,
623+
self.stream._BinLogStreamReader__use_column_name_cache,
622624
)
623625

624626
self.stream.close()

0 commit comments

Comments
 (0)