diff --git a/tests/gc-in-compaction-filter/README.md b/tests/gc-in-compaction-filter/README.md new file mode 100644 index 00000000..ff967828 --- /dev/null +++ b/tests/gc-in-compaction-filter/README.md @@ -0,0 +1,14 @@ +### Prepare Data + +``` +$ sysbench --mysql-host= --mysql-port= --mysql-user=root --tables=128 --table-size=1000000 --threads=32 --time=600 common.lua prepare +``` + +### Run the Test Case + +``` +./run.sh +``` + +### TODO list +- [ ] support to automatic run it on tipocket diff --git a/tests/gc-in-compaction-filter/common.lua b/tests/gc-in-compaction-filter/common.lua new file mode 100644 index 00000000..c9d47194 --- /dev/null +++ b/tests/gc-in-compaction-filter/common.lua @@ -0,0 +1,164 @@ +function init() end + +if sysbench.cmdline.command == nil then + error("Command is required. Supported commands: prepare, run, help") +end + +sysbench.cmdline.options = { + table_size = {"Number of rows per table", 10000}, + tables = {"Number of tables", 1}, + modifies = {"Row to be updated in one transaction", 5} +} + +function cmd_prepare() + local drv = sysbench.sql.driver() + local con = drv:connect() + local threads = sysbench.opt.threads + for i = sysbench.tid % threads + 1, sysbench.opt.tables, threads do + create_table(drv, con, i) + end +end + +sysbench.cmdline.commands = { + prepare = {cmd_prepare, sysbench.cmdline.PARALLEL_COMMAND} +} + +-- 20 groups, 249 characters +local c_value_template = "###########-###########-###########-" .. + "###########-###########-###########-" .. + "###########-###########-###########-" .. + "###########-###########-###########-" .. + "###########-###########-###########-" .. + "###########-###########-###########-" .. + "###########-###########" + +-- 4 group, 47 characters +local pad_value_template = "###########-###########-###########-###########" + +function get_c_value() return sysbench.rand.string(c_value_template) end + +function get_pad_value() return sysbench.rand.string(pad_value_template) end + +function create_table(drv, con, table_num) + local id_def = "BIGINT NOT NULL" + local id_index_def = "PRIMARY KEY" + local engine_def = "" + local extra_table_options = "" + + print(string.format("Creating table 'sbtest%d'...", table_num)) + local query = string.format([[ + CREATE TABLE IF NOT EXISTS sbtest%d( + id %s, + k BIGINT DEFAULT '0' NOT NULL, + c CHAR(255) DEFAULT '' NOT NULL, + pad CHAR(60) DEFAULT '' NOT NULL, + id_suffix %s, + PRIMARY KEY (id), + UNIQUE KEY (id_suffix), + KEY (k) + ) %s %s]], table_num, id_def, id_def, engine_def, + extra_table_options) + con:query(query) + + print(string.format("Inserting %d records into 'sbtest%d'", + sysbench.opt.table_size, table_num)) + + query = "INSERT INTO sbtest" .. table_num .. + "(id, k, c, pad, id_suffix) VALUES" + + con:bulk_insert_init(query) + for i = 1, sysbench.opt.table_size do + local c_val = "" + if sysbench.rand.uniform(1, 6) % 6 == 3 then + c_val = get_c_value() + end + local pad_val = get_pad_value() + query = string.format("(%d, %d, '%s', '%s', %d)", i, sb_rand(1, 255), + c_val, pad_val, i) + con:bulk_insert_next(query) + end + con:bulk_insert_done() +end + +local t = sysbench.sql.type +local stmt_defs = { + delete = {"DELETE FROM sbtest%u WHERE id = ?", t.INT}, + insert = { + "INSERT INTO sbtest%u (id, k, c, pad, id_suffix) VALUES (?, ?, ?, ?, ?)", + t.INT, t.INT, {t.CHAR, 255}, {t.CHAR, 60}, t.INT + }, + update = {"UPDATE sbtest%u SET k = k + ? WHERE id = ?", t.INT, t.INT} +} + +function prepare_begin() stmt.begin = con:prepare("BEGIN") end + +function prepare_commit() stmt.commit = con:prepare("COMMIT") end + +function prepare_for_each_table(key) + for t = 1, sysbench.opt.tables do + stmt[t][key] = con:prepare(string.format(stmt_defs[key][1], t)) + + local nparam = #stmt_defs[key] - 1 + if nparam > 0 then param[t][key] = {} end + for p = 1, nparam do + local btype = stmt_defs[key][p + 1] + local len + if type(btype) == "table" then + len = btype[2] + btype = btype[1] + end + if btype == sysbench.sql.type.VARCHAR or btype == + sysbench.sql.type.CHAR then + param[t][key][p] = stmt[t][key]:bind_create(btype, len) + else + param[t][key][p] = stmt[t][key]:bind_create(btype) + end + end + if nparam > 0 then stmt[t][key]:bind_param(unpack(param[t][key])) end + end +end + +function thread_init() + drv = sysbench.sql.driver() + con = drv:connect() + + stmt = {} + param = {} + for t = 1, sysbench.opt.tables do + stmt[t] = {} + param[t] = {} + end + prepare_statements() +end + +function close_statements() + for t = 1, sysbench.opt.tables do + for k, s in pairs(stmt[t]) do stmt[t][k]:close() end + end + if (stmt.begin ~= nil) then stmt.begin:close() end + if (stmt.commit ~= nil) then stmt.commit:close() end +end + +function thread_done() + close_statements() + con:disconnect() +end + +function get_table_num() return sysbench.rand.uniform(1, sysbench.opt.tables) end + +function get_id() return sysbench.rand.uniform(1, sysbench.opt.table_size) end + +function begin() stmt.begin:execute() end + +function commit() stmt.commit:execute() end + +function sysbench.hooks.before_restart_event(errdesc) + if errdesc.sql_errno == 2013 or -- CR_SERVER_LOST + errdesc.sql_errno == 2055 or -- CR_SERVER_LOST_EXTENDED + errdesc.sql_errno == 2006 or -- CR_SERVER_GONE_ERROR + errdesc.sql_errno == 2011 -- CR_TCP_CONNECTION + then + close_statements() + prepare_statements() + end +end diff --git a/tests/gc-in-compaction-filter/run.sh b/tests/gc-in-compaction-filter/run.sh new file mode 100755 index 00000000..d718ad81 --- /dev/null +++ b/tests/gc-in-compaction-filter/run.sh @@ -0,0 +1,20 @@ +host=$1 +port=$2 + +init_timeout=--thread-init-timeout=1000 + +for i in `seq 1 100`; do + if [ $(($i % 2)) -eq 1 ]; then + echo "enable compaction filter" + mysql -h $host --port=$port -u root -e "set config tikv gc.enable-compaction-filter = true" + else + echo "disable compaction filter" + mysql -h $host --port=$port -u root -e "set config tikv gc.enable-compaction-filter = false" + fi + sysbench $init_timeout --mysql-host=$host --mysql-port=$port --mysql-user=root --tables=128 --table-size=1000000 --threads=128 --time=600 updates run + if [ $? -ne 0 ]; then + echo "fail" + return + fi +done +echo "success" diff --git a/tests/gc-in-compaction-filter/updates.lua b/tests/gc-in-compaction-filter/updates.lua new file mode 100644 index 00000000..482386a0 --- /dev/null +++ b/tests/gc-in-compaction-filter/updates.lua @@ -0,0 +1,143 @@ +require("common") + +function sleep(n) os.execute("sleep " .. n) end + +function get_random_zero_sum_seq(len) + local sum = 0 + local t = {} + for i = 1, len / 2 do + local x = sysbench.rand.uniform(1, 255) + table.insert(t, x) + table.insert(t, -x) + end + if #t == len - 1 then table.insert(t, 0) end + return t +end + +function prepare_statements() + prepare_begin() + prepare_commit() + prepare_for_each_table("delete") + prepare_for_each_table("insert") + prepare_for_each_table("update") +end + +function too_many_processlist(con) + local rs = con:query("show processlist") + local busy_count = 0 + for i = 1, rs.nrows do + local command = unpack(rs:fetch_row(), 5, 5) + if command ~= "Sleep" then busy_count = busy_count + 1 end + end + rs:free() + return busy_count >= 20 +end + +function get_counters(con, tid) + local sql = "select sum(k), count(id) from sbtest" .. tid + local rs = con:query(sql) + local sum_k_s, count_id_s = unpack(rs:fetch_row(), 1, 2) + rs:free() + local sum_k = tonumber(sum_k_s) + local count_id = tonumber(count_id_s) + local sql = string.format("select count(k) from sbtest%u use index(k)", tid) + local rs = con:query(sql) + local count_k_s = unpack(rs:fetch_row(), 1, 1) + rs:free() + local count_k = tonumber(count_k_s) + return sum_k, count_id, count_k +end + +function thread_init() + drv = sysbench.sql.driver() + con = drv:connect() + stmt = {} + param = {} + for t = 1, sysbench.opt.tables do + stmt[t] = {} + param[t] = {} + end + prepare_statements() + + local tid = sysbench.tid % sysbench.opt.threads + 1 + if tid <= sysbench.opt.tables then + while too_many_processlist(con) do sleep(1) end + sum_k, count_id, count_k = get_counters(con, tid) + print("sbtest" .. tid .. ", sum(k): " .. sum_k .. ", count(id): " .. + count_id .. ", count(k): " .. count_k) + end + print("enable async commit for session: " .. sysbench.tid % sysbench.opt.tables) + con:query("set session tidb_enable_async_commit = 1") + -- print("enable follower read for session: " .. sysbench.tid % sysbench.opt.tables) + -- con:query("set @@tidb_replica_read = 'leader-and-follower'") +end + +function thread_done() + local tid = sysbench.tid % sysbench.opt.threads + 1 + if tid <= sysbench.opt.tables then + while too_many_processlist(con) do sleep(1) end + local sum_k_1, count_id_1, count_k_1 = get_counters(con, tid) + if sum_k_1 ~= sum_k or count_id_1 ~= count_id or count_k_1 ~= count_k then + print("corrupt sbtest" .. tid .. ", sum(k): " .. sum_k .. + ", count(id): " .. count_id .. ", count(k): " .. count_k) + os.exit(-1) + end + print("consistency check for sbtest" .. tid .. " success") + end + close_statements() + con:disconnect() +end + +function event() + begin() + + local tnum = get_table_num() + local id_suffix = get_id() + local rs = con:query(string.format( + "SELECT id,k,id_suffix FROM sbtest%u WHERE id_suffix BETWEEN %d AND %d for update", + tnum, id_suffix, id_suffix + sysbench.opt.modifies)) + local update_list = {} + local del_ins_list = {} + for i = 1, rs.nrows do + local id, k, suffix = unpack(rs:fetch_row(), 1, rs.nfields) + id = tonumber(id) + k = tonumber(k) + suffix = tonumber(suffix) + + if sysbench.rand.uniform(1, 2) % 2 == 1 then + local t = {} + t["id"] = id + t["k"] = k + t["suffix"] = suffix + table.insert(del_ins_list, t) + else + table.insert(update_list, id) + end + end + rs:free() + + for i = 1, #del_ins_list do + local id = del_ins_list[i]["id"] + param[tnum].delete[1]:set(id) + stmt[tnum].delete:execute() + param[tnum].insert[1]:set(id + sysbench.opt.table_size) + param[tnum].insert[2]:set(del_ins_list[i]["k"]) + if sysbench.rand.uniform(1, 6) % 6 == 3 then + param[tnum].insert[3]:set(get_c_value()) + else + param[tnum].insert[3]:set("") + end + param[tnum].insert[4]:set(get_pad_value()) + param[tnum].insert[5]:set(del_ins_list[i]["suffix"]) + stmt[tnum].insert:execute() + end + + local zero_sum_seq = get_random_zero_sum_seq(#update_list) + for i = 1, #update_list do + param[tnum].update[1]:set(zero_sum_seq[i]) + param[tnum].update[2]:set(update_list[i]) + stmt[tnum].update:execute() + end + + commit() +end