Skip to content

Commit a94c6c2

Browse files
Add test and documentation for the Write a test for MySQL 5.7 column name support
1 parent 6a6ce0a commit a94c6c2

File tree

5 files changed

+171
-12
lines changed

5 files changed

+171
-12
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,7 @@ Other contributors:
366366
- Oliver Seemann: Handle large json, github actions,
367367
Zero-pad fixed-length binary fields (https://github.com/oseemann)
368368
- Mahadir Ahmad: Handle null json payload (https://github.com/mahadirz)
369+
- Mehmet Kartalbas: Add MySQL 5.7 column name support (https://github.com/kartalbas)
369370
- Axel Viala: Removal of Python 2.7 (https://github.com/darnuria)
370371
- Etern: Add XAPrepareEvent, parse last_committed & sequence_number of GtidEvent (https://github.com/etern)
371372
- Jason Fulghum: typo in ident variable name (https://github.com/fulghum)

docs/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ Contents
2626

2727
installation
2828
limitations
29+
mysql57_support
2930
binlogstream
3031
events
3132
examples

docs/mysql57_support.rst

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
.. _mysql57_support:
2+
3+
MySQL 5.7, MySQL 8.0+ and `use_column_name_cache`
4+
==================================================
5+
6+
In MySQL 5.7 and earlier, the binary log events for row-based replication do not include column name metadata. This means that `python-mysql-replication` cannot map column values to their names directly from the binlog event.
7+
8+
Starting with MySQL 8.0.1, the `binlog_row_metadata` system variable was introduced to control the amount of metadata written to the binary log. This is a **GLOBAL** and **DYNAMIC** variable. The default value for this variable is `MINIMAL`, which provides the same behavior as MySQL 5.7.
9+
10+
The Problem
11+
-----------
12+
13+
When column metadata is not present in the binlog (as in MySQL 5.7 and earlier, or when `binlog_row_metadata` is set to `MINIMAL` globally in MySQL 8.0+), the `values` dictionary in a `WriteRowsEvent`, `UpdateRowsEvent`, or `DeleteRowsEvent` will contain integer keys corresponding to the column index, not the column names.
14+
15+
For example, for a table `users` with columns `id` and `name`, an insert event might look like this:
16+
17+
.. code-block:: python
18+
19+
{0: 1, 1: 'John Doe'}
20+
21+
This can make your replication logic harder to write and maintain, as you need to know the column order.
22+
23+
The Solution: `use_column_name_cache`
24+
-------------------------------------
25+
26+
To address this, `python-mysql-replication` provides the `use_column_name_cache` parameter for the `BinLogStreamReader`.
27+
28+
When you set `use_column_name_cache=True`, the library will perform a query to the `INFORMATION_SCHEMA.COLUMNS` table to fetch the column names for a given table the first time it encounters an event for that table. The column names are then cached in memory for subsequent events for the same table, avoiding redundant queries.
29+
30+
This allows you to receive row data with column names as keys.
31+
32+
MySQL 8.0+ with `binlog_row_metadata=FULL`
33+
------------------------------------------
34+
35+
In MySQL 8.0.1 and later, you can set `binlog_row_metadata` to `FULL` using `SET GLOBAL binlog_row_metadata = 'FULL'`. When this setting is enabled, the column names are included directly in the binlog events, and `use_column_name_cache` is not necessary.
36+
37+
Example
38+
-------
39+
40+
Here is how to enable the column name cache when needed:
41+
42+
.. code-block:: python
43+
44+
from pymysqlreplication import BinLogStreamReader
45+
46+
mysql_settings = {'host': '127.0.0.1', 'port': 3306, 'user': 'root', 'passwd': ''}
47+
48+
# Enable the column name cache for MySQL 5.7 or MySQL 8.0+ with binlog_row_metadata=MINIMAL
49+
stream = BinLogStreamReader(
50+
connection_settings=mysql_settings,
51+
server_id=100,
52+
use_column_name_cache=True
53+
)
54+
55+
for binlogevent in stream:
56+
if isinstance(binlogevent, WriteRowsEvent):
57+
# Now you can access values by column name
58+
user_id = binlogevent.rows[0]["values"]["id"]
59+
user_name = binlogevent.rows[0]["values"]["name"]
60+
print(f"New user: id={user_id}, name={user_name}")
61+
62+
stream.close()
63+
64+
Important Considerations
65+
------------------------
66+
67+
* **Performance:** Enabling `use_column_name_cache` will result in an extra query to the database for each new table encountered in the binlog. The results are cached, so the performance impact should be minimal after the initial query for each table.
68+
* **Permissions:** The MySQL user used for replication must have `SELECT` privileges on the `INFORMATION_SCHEMA.COLUMNS` table.
69+
* **Default Behavior:** This feature is disabled by default (`use_column_name_cache=False`) to maintain backward compatibility and to avoid making extra queries unless explicitly requested.

pymysqlreplication/tests/base.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
import pymysql
21
import copy
3-
from pymysqlreplication import BinLogStreamReader
4-
import os
52
import json
3+
import os
4+
import unittest
5+
6+
import pymysql
67
import pytest
78

8-
import unittest
9+
from pymysqlreplication import BinLogStreamReader
910

1011

1112
def get_databases():
@@ -89,6 +90,12 @@ def isMySQL57(self):
8990
version = float(self.getMySQLVersion().rsplit(".", 1)[0])
9091
return version == 5.7
9192

93+
def isMySQL57AndMore(self):
94+
if self.isMariaDB():
95+
return False
96+
version = float(self.getMySQLVersion().rsplit(".", 1)[0])
97+
return version >= 5.7
98+
9299
def isMySQL80AndMore(self):
93100
if self.isMariaDB():
94101
return False

pymysqlreplication/tests/test_basic.py

Lines changed: 89 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
import io
22
import time
33
import unittest
4+
from unittest.mock import patch
5+
6+
from pymysql.protocol import MysqlPacket
47

5-
from pymysqlreplication.json_binary import JsonDiff, JsonDiffOperation
6-
from pymysqlreplication.tests import base
78
from pymysqlreplication import BinLogStreamReader
8-
from pymysqlreplication.gtid import GtidSet, Gtid
9-
from pymysqlreplication.event import *
109
from pymysqlreplication.constants.BINLOG import *
1110
from pymysqlreplication.constants.NONE_SOURCE import *
12-
from pymysqlreplication.row_event import *
11+
from pymysqlreplication.event import *
12+
from pymysqlreplication.gtid import Gtid, GtidSet
13+
from pymysqlreplication.json_binary import JsonDiff, JsonDiffOperation
1314
from pymysqlreplication.packet import BinLogPacketWrapper
14-
from pymysql.protocol import MysqlPacket
15-
from unittest.mock import patch
16-
15+
from pymysqlreplication.row_event import *
16+
from pymysqlreplication.tests import base
1717

1818
__all__ = [
1919
"TestBasicBinLogStreamReader",
@@ -271,6 +271,87 @@ def test_write_row_event(self):
271271
self.assertEqual(event.rows[0]["values"]["data"], "Hello World")
272272
self.assertEqual(event.columns[1].name, "data")
273273

274+
def test_fetch_column_names_from_schema(self):
275+
# This test is for scenarios where column names are NOT in the binlog
276+
# (MySQL 5.7 or older, or MySQL 8.0+ with binlog_row_metadata=MINIMAL)
277+
278+
# Check if binlog_row_metadata exists (MySQL 8.0+)
279+
try:
280+
cursor = self.execute("SHOW GLOBAL VARIABLES LIKE 'binlog_row_metadata'")
281+
result = cursor.fetchone()
282+
if result:
283+
global_binlog_row_metadata = result[1]
284+
if global_binlog_row_metadata == 'FULL':
285+
self.skipTest("binlog_row_metadata is FULL globally, use_column_name_cache is not needed")
286+
# If result is None, binlog_row_metadata doesn't exist (MySQL 5.7 or older), so proceed
287+
except pymysql.err.OperationalError as e:
288+
if e.args[0] == 1193: # ER_UNKNOWN_SYSTEM_VARIABLE
289+
# Variable doesn't exist, likely MySQL 5.7 or older, so proceed
290+
pass
291+
else:
292+
raise
293+
294+
query = "CREATE TABLE test_column_cache (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
295+
self.execute(query)
296+
self.execute("INSERT INTO test_column_cache (data) VALUES('Hello')")
297+
self.execute("COMMIT")
298+
299+
# Test with use_column_name_cache = True
300+
self.stream.close()
301+
self.stream = BinLogStreamReader(
302+
self.database,
303+
server_id=1024,
304+
use_column_name_cache=True,
305+
only_events=[WriteRowsEvent],
306+
)
307+
308+
event = self.stream.fetchone()
309+
self.assertIsInstance(event, WriteRowsEvent)
310+
self.assertEqual(event.table, "test_column_cache")
311+
self.assertIn("id", event.rows[0]["values"])
312+
self.assertIn("data", event.rows[0]["values"])
313+
self.assertEqual(event.rows[0]["values"]["id"], 1)
314+
self.assertEqual(event.rows[0]["values"]["data"], "Hello")
315+
316+
# Test with use_column_name_cache = False
317+
self.stream.close()
318+
319+
# Clear cache before next run
320+
from pymysqlreplication import row_event
321+
row_event._COLUMN_NAME_CACHE.clear()
322+
323+
self.stream = BinLogStreamReader(
324+
self.database,
325+
server_id=1025, # different server_id to avoid caching issues
326+
use_column_name_cache=False,
327+
only_events=[WriteRowsEvent],
328+
)
329+
330+
# Reset and replay events
331+
self.resetBinLog()
332+
self.execute("INSERT INTO test_column_cache (data) VALUES('World')")
333+
self.execute("COMMIT")
334+
335+
# Skip RotateEvent and FormatDescriptionEvent
336+
self.stream.fetchone()
337+
self.stream.fetchone()
338+
# Skip QueryEvent for BEGIN
339+
if not self.isMariaDB():
340+
self.stream.fetchone()
341+
# Skip TableMapEvent
342+
self.stream.fetchone()
343+
344+
event = self.stream.fetchone()
345+
self.assertIsInstance(event, WriteRowsEvent)
346+
self.assertEqual(event.table, "test_column_cache")
347+
# With cache disabled, we should not have column names
348+
self.assertNotIn("id", event.rows[0]["values"])
349+
self.assertNotIn("data", event.rows[0]["values"])
350+
351+
# cleanup
352+
row_event._COLUMN_NAME_CACHE.clear()
353+
354+
274355
def test_delete_row_event(self):
275356
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
276357
self.execute(query)

0 commit comments

Comments
 (0)