Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ Other contributors:
- Oliver Seemann: Handle large json, github actions,
Zero-pad fixed-length binary fields (https://github.com/oseemann)
- Mahadir Ahmad: Handle null json payload (https://github.com/mahadirz)
- Mehmet Kartalbas: Add MySQL 5.7 column name support (https://github.com/kartalbas)
- Axel Viala: Removal of Python 2.7 (https://github.com/darnuria)
- Etern: Add XAPrepareEvent, parse last_committed & sequence_number of GtidEvent (https://github.com/etern)
- Jason Fulghum: typo in ident variable name (https://github.com/fulghum)
Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Contents

installation
limitations
mysql57_support
binlogstream
events
examples
Expand Down
69 changes: 69 additions & 0 deletions docs/mysql57_support.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
.. _mysql57_support:

MySQL 5.7, MySQL 8.0+ and `use_column_name_cache`
==================================================

In MySQL 5.7 and earlier, the binary log events for row-based replication do not include column name metadata. This means that `python-mysql-replication` cannot map column values to their names directly from the binlog event.

Starting with MySQL 8.0.1, the `binlog_row_metadata` system variable was introduced to control the amount of metadata written to the binary log. This is a **GLOBAL** and **DYNAMIC** variable. The default value for this variable is `MINIMAL`, which provides the same behavior as MySQL 5.7.

The Problem
-----------

When column metadata is not present in the binlog (as in MySQL 5.7 and earlier, or when `binlog_row_metadata` is set to `MINIMAL` globally in MySQL 8.0+), the `values` dictionary in a `WriteRowsEvent`, `UpdateRowsEvent`, or `DeleteRowsEvent` will contain integer keys corresponding to the column index, not the column names.

For example, for a table `users` with columns `id` and `name`, an insert event might look like this:

.. code-block:: python

{0: 1, 1: 'John Doe'}

This can make your replication logic harder to write and maintain, as you need to know the column order.

The Solution: `use_column_name_cache`
-------------------------------------

To address this, `python-mysql-replication` provides the `use_column_name_cache` parameter for the `BinLogStreamReader`.

When you set `use_column_name_cache=True`, the library will perform a query to the `INFORMATION_SCHEMA.COLUMNS` table to fetch the column names for a given table the first time it encounters an event for that table. The column names are then cached in memory for subsequent events for the same table, avoiding redundant queries.

This allows you to receive row data with column names as keys.

MySQL 8.0+ with `binlog_row_metadata=FULL`
------------------------------------------

In MySQL 8.0.1 and later, you can set `binlog_row_metadata` to `FULL` using `SET GLOBAL binlog_row_metadata = 'FULL'`. When this setting is enabled, the column names are included directly in the binlog events, and `use_column_name_cache` is not necessary.

Example
-------

Here is how to enable the column name cache when needed:

.. code-block:: python

from pymysqlreplication import BinLogStreamReader

mysql_settings = {'host': '127.0.0.1', 'port': 3306, 'user': 'root', 'passwd': ''}

# Enable the column name cache for MySQL 5.7 or MySQL 8.0+ with binlog_row_metadata=MINIMAL
stream = BinLogStreamReader(
connection_settings=mysql_settings,
server_id=100,
use_column_name_cache=True
)

for binlogevent in stream:
if isinstance(binlogevent, WriteRowsEvent):
# Now you can access values by column name
user_id = binlogevent.rows[0]["values"]["id"]
user_name = binlogevent.rows[0]["values"]["name"]
print(f"New user: id={user_id}, name={user_name}")

stream.close()

Important Considerations
------------------------

* **Performance:** Enabling `use_column_name_cache` will result in an extra query to the database for each new table encountered in the binlog. The results are cached, so the performance impact should be minimal after the initial query for each table.
* **Permissions:** The MySQL user used for replication must have `SELECT` privileges on the `INFORMATION_SCHEMA.COLUMNS` table.
* **Default Behavior:** This feature is disabled by default (`use_column_name_cache=False`) to maintain backward compatibility and to avoid making extra queries unless explicitly requested.
7 changes: 7 additions & 0 deletions pymysqlreplication/binlogstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ def __init__(
ignore_decode_errors=False,
verify_checksum=False,
enable_logging=True,
use_column_name_cache=False,
):
"""
Attributes:
Expand Down Expand Up @@ -230,6 +231,8 @@ def __init__(
verify_checksum: If true, verify events read from the binary log by examining checksums.
enable_logging: When set to True, logs various details helpful for debugging and monitoring
When set to False, logging is disabled to enhance performance.
use_column_name_cache: If true, enables caching of column names from INFORMATION_SCHEMA
for MySQL 5.7 compatibility when binlog metadata is missing. Default is False.
"""

self.__connection_settings = connection_settings
Expand All @@ -254,6 +257,8 @@ def __init__(
self.__ignore_decode_errors = ignore_decode_errors
self.__verify_checksum = verify_checksum
self.__optional_meta_data = False
self.__enable_logging = enable_logging
self.__use_column_name_cache = use_column_name_cache

# We can't filter on packet level TABLE_MAP and rotate event because
# we need them for handling other operations
Expand Down Expand Up @@ -630,6 +635,8 @@ def fetchone(self):
self.__ignore_decode_errors,
self.__verify_checksum,
self.__optional_meta_data,
self.__enable_logging,
self.__use_column_name_cache,
)

if binlog_event.event_type == ROTATE_EVENT:
Expand Down
2 changes: 2 additions & 0 deletions pymysqlreplication/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ def __init__(
ignore_decode_errors=False,
verify_checksum=False,
optional_meta_data=False,
enable_logging=False,
use_column_name_cache=False,
):
self.packet = from_packet
self.table_map = table_map
Expand Down
6 changes: 6 additions & 0 deletions pymysqlreplication/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ def __init__(
ignore_decode_errors,
verify_checksum,
optional_meta_data,
enable_logging,
use_column_name_cache=False,
):
# -1 because we ignore the ok byte
self.read_bytes = 0
Expand All @@ -82,6 +84,8 @@ def __init__(

self.packet = from_packet
self.charset = ctl_connection.charset
self.enable_logging = enable_logging
self.use_column_name_cache = use_column_name_cache

# OK value
# timestamp
Expand Down Expand Up @@ -127,6 +131,8 @@ def __init__(
ignore_decode_errors=ignore_decode_errors,
verify_checksum=verify_checksum,
optional_meta_data=optional_meta_data,
enable_logging=enable_logging,
use_column_name_cache=use_column_name_cache,
)
if not self.event._processed:
self.event = None
Expand Down
69 changes: 67 additions & 2 deletions pymysqlreplication/row_event.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import struct
import decimal
import datetime
import logging

from pymysql.charset import charset_by_name
from enum import Enum
Expand All @@ -15,6 +16,10 @@
from .bitmap import BitCount, BitGet



# MySQL 5.7 compatibility: Cache for INFORMATION_SCHEMA column names
_COLUMN_NAME_CACHE = {}

class RowsEvent(BinLogEvent):
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
Expand Down Expand Up @@ -746,6 +751,8 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
self.__ignored_schemas = kwargs["ignored_schemas"]
self.__freeze_schema = kwargs["freeze_schema"]
self.__optional_meta_data = kwargs["optional_meta_data"]
self.__enable_logging = kwargs.get("enable_logging", False)
self.__use_column_name_cache = kwargs.get("use_column_name_cache", False)
# Post-Header
self.table_id = self._read_table_id()

Expand Down Expand Up @@ -909,12 +916,70 @@ def _get_optional_meta_data(self):

return optional_metadata


def _fetch_column_names_from_schema(self):
"""
Fetch column names from INFORMATION_SCHEMA for MySQL 5.7 compatibility.

Only executes if use_column_name_cache=True is enabled.
Uses module-level cache to avoid repeated queries.

Returns:
list: Column names in ORDINAL_POSITION order, or empty list
"""
# Only fetch if explicitly enabled (opt-in feature)
if not self.__use_column_name_cache:
return []

cache_key = f"{self.schema}.{self.table}"

# Check cache first
if cache_key in _COLUMN_NAME_CACHE:
return _COLUMN_NAME_CACHE[cache_key]

try:
query = """
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s
ORDER BY ORDINAL_POSITION
"""
cursor = self._ctl_connection.cursor()
cursor.execute(query, (self.schema, self.table))
rows = cursor.fetchall()
# Handle both tuple and dict cursor results
if rows and isinstance(rows[0], dict):
column_names = [row['COLUMN_NAME'] for row in rows]
else:
column_names = [row[0] for row in rows]
cursor.close()

# Cache result
_COLUMN_NAME_CACHE[cache_key] = column_names

if self.__enable_logging and column_names:
logging.info(f"Cached column names for {cache_key}: {len(column_names)} columns")

return column_names
except Exception as e:
if self.__enable_logging:
logging.warning(f"Failed to fetch column names for {cache_key}: {type(e).__name__}: {e}")
# Cache empty result to avoid retry spam
_COLUMN_NAME_CACHE[cache_key] = []
return []

def _sync_column_info(self):
if not self.__optional_meta_data:
# If optional_meta_data is False Do not sync Event Time Column Schemas
column_names = self._fetch_column_names_from_schema()
if column_names and len(column_names) == self.column_count:
for column_idx in range(self.column_count):
self.columns[column_idx].name = column_names[column_idx]
return
if len(self.optional_metadata.column_name_list) == 0:
# May Be Now BINLOG_ROW_METADATA = FULL But Before Action BINLOG_ROW_METADATA Mode = MINIMAL
column_names = self._fetch_column_names_from_schema()
if column_names and len(column_names) == self.column_count:
for column_idx in range(self.column_count):
self.columns[column_idx].name = column_names[column_idx]
return
charset_pos = 0
enum_or_set_pos = 0
Expand Down
15 changes: 11 additions & 4 deletions pymysqlreplication/tests/base.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import pymysql
import copy
from pymysqlreplication import BinLogStreamReader
import os
import json
import os
import unittest

import pymysql
import pytest

import unittest
from pymysqlreplication import BinLogStreamReader


def get_databases():
Expand Down Expand Up @@ -89,6 +90,12 @@ def isMySQL57(self):
version = float(self.getMySQLVersion().rsplit(".", 1)[0])
return version == 5.7

def isMySQL57AndMore(self):
if self.isMariaDB():
return False
version = float(self.getMySQLVersion().rsplit(".", 1)[0])
return version >= 5.7

def isMySQL80AndMore(self):
if self.isMariaDB():
return False
Expand Down
Loading