Skip to content
This repository was archived by the owner on Dec 9, 2024. It is now read-only.

add a compaction filter long term case #378

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions tests/gc-in-compaction-filter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
### Prepare Data

```
$ sysbench --mysql-host=<host> --mysql-port=<port> --mysql-user=root --tables=128 --table-size=1000000 --threads=32 --time=600 common.lua prepare
```

### Run the Test Case

```
./run.sh <tidb-host> <tidb-port>
```

### TODO list
- [ ] support to automatic run it on tipocket
164 changes: 164 additions & 0 deletions tests/gc-in-compaction-filter/common.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
function init() end

if sysbench.cmdline.command == nil then
error("Command is required. Supported commands: prepare, run, help")
end

sysbench.cmdline.options = {
table_size = {"Number of rows per table", 10000},
tables = {"Number of tables", 1},
modifies = {"Row to be updated in one transaction", 5}
}

function cmd_prepare()
local drv = sysbench.sql.driver()
local con = drv:connect()
local threads = sysbench.opt.threads
for i = sysbench.tid % threads + 1, sysbench.opt.tables, threads do
create_table(drv, con, i)
end
end

sysbench.cmdline.commands = {
prepare = {cmd_prepare, sysbench.cmdline.PARALLEL_COMMAND}
}

-- 20 groups, 249 characters
local c_value_template = "###########-###########-###########-" ..
"###########-###########-###########-" ..
"###########-###########-###########-" ..
"###########-###########-###########-" ..
"###########-###########-###########-" ..
"###########-###########-###########-" ..
"###########-###########"

-- 4 group, 47 characters
local pad_value_template = "###########-###########-###########-###########"

function get_c_value() return sysbench.rand.string(c_value_template) end

function get_pad_value() return sysbench.rand.string(pad_value_template) end

function create_table(drv, con, table_num)
local id_def = "BIGINT NOT NULL"
local id_index_def = "PRIMARY KEY"
local engine_def = ""
local extra_table_options = ""

print(string.format("Creating table 'sbtest%d'...", table_num))
local query = string.format([[
CREATE TABLE IF NOT EXISTS sbtest%d(
id %s,
k BIGINT DEFAULT '0' NOT NULL,
c CHAR(255) DEFAULT '' NOT NULL,
pad CHAR(60) DEFAULT '' NOT NULL,
id_suffix %s,
PRIMARY KEY (id),
UNIQUE KEY (id_suffix),
KEY (k)
) %s %s]], table_num, id_def, id_def, engine_def,
extra_table_options)
con:query(query)

print(string.format("Inserting %d records into 'sbtest%d'",
sysbench.opt.table_size, table_num))

query = "INSERT INTO sbtest" .. table_num ..
"(id, k, c, pad, id_suffix) VALUES"

con:bulk_insert_init(query)
for i = 1, sysbench.opt.table_size do
local c_val = ""
if sysbench.rand.uniform(1, 6) % 6 == 3 then
c_val = get_c_value()
end
local pad_val = get_pad_value()
query = string.format("(%d, %d, '%s', '%s', %d)", i, sb_rand(1, 255),
c_val, pad_val, i)
con:bulk_insert_next(query)
end
con:bulk_insert_done()
end

local t = sysbench.sql.type
local stmt_defs = {
delete = {"DELETE FROM sbtest%u WHERE id = ?", t.INT},
insert = {
"INSERT INTO sbtest%u (id, k, c, pad, id_suffix) VALUES (?, ?, ?, ?, ?)",
t.INT, t.INT, {t.CHAR, 255}, {t.CHAR, 60}, t.INT
},
update = {"UPDATE sbtest%u SET k = k + ? WHERE id = ?", t.INT, t.INT}
}

function prepare_begin() stmt.begin = con:prepare("BEGIN") end

function prepare_commit() stmt.commit = con:prepare("COMMIT") end

function prepare_for_each_table(key)
for t = 1, sysbench.opt.tables do
stmt[t][key] = con:prepare(string.format(stmt_defs[key][1], t))

local nparam = #stmt_defs[key] - 1
if nparam > 0 then param[t][key] = {} end
for p = 1, nparam do
local btype = stmt_defs[key][p + 1]
local len
if type(btype) == "table" then
len = btype[2]
btype = btype[1]
end
if btype == sysbench.sql.type.VARCHAR or btype ==
sysbench.sql.type.CHAR then
param[t][key][p] = stmt[t][key]:bind_create(btype, len)
else
param[t][key][p] = stmt[t][key]:bind_create(btype)
end
end
if nparam > 0 then stmt[t][key]:bind_param(unpack(param[t][key])) end
end
end

function thread_init()
drv = sysbench.sql.driver()
con = drv:connect()

stmt = {}
param = {}
for t = 1, sysbench.opt.tables do
stmt[t] = {}
param[t] = {}
end
prepare_statements()
end

function close_statements()
for t = 1, sysbench.opt.tables do
for k, s in pairs(stmt[t]) do stmt[t][k]:close() end
end
if (stmt.begin ~= nil) then stmt.begin:close() end
if (stmt.commit ~= nil) then stmt.commit:close() end
end

function thread_done()
close_statements()
con:disconnect()
end

function get_table_num() return sysbench.rand.uniform(1, sysbench.opt.tables) end

function get_id() return sysbench.rand.uniform(1, sysbench.opt.table_size) end

function begin() stmt.begin:execute() end

function commit() stmt.commit:execute() end

function sysbench.hooks.before_restart_event(errdesc)
if errdesc.sql_errno == 2013 or -- CR_SERVER_LOST
errdesc.sql_errno == 2055 or -- CR_SERVER_LOST_EXTENDED
errdesc.sql_errno == 2006 or -- CR_SERVER_GONE_ERROR
errdesc.sql_errno == 2011 -- CR_TCP_CONNECTION
then
close_statements()
prepare_statements()
end
end
20 changes: 20 additions & 0 deletions tests/gc-in-compaction-filter/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
host=$1
port=$2

init_timeout=--thread-init-timeout=1000

for i in `seq 1 100`; do
if [ $(($i % 2)) -eq 1 ]; then
echo "enable compaction filter"
mysql -h $host --port=$port -u root -e "set config tikv gc.enable-compaction-filter = true"
else
echo "disable compaction filter"
mysql -h $host --port=$port -u root -e "set config tikv gc.enable-compaction-filter = false"
fi
sysbench $init_timeout --mysql-host=$host --mysql-port=$port --mysql-user=root --tables=128 --table-size=1000000 --threads=128 --time=600 updates run
if [ $? -ne 0 ]; then
echo "fail"
return
fi
done
echo "success"
143 changes: 143 additions & 0 deletions tests/gc-in-compaction-filter/updates.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
require("common")

function sleep(n) os.execute("sleep " .. n) end

function get_random_zero_sum_seq(len)
local sum = 0
local t = {}
for i = 1, len / 2 do
local x = sysbench.rand.uniform(1, 255)
table.insert(t, x)
table.insert(t, -x)
end
if #t == len - 1 then table.insert(t, 0) end
return t
end

function prepare_statements()
prepare_begin()
prepare_commit()
prepare_for_each_table("delete")
prepare_for_each_table("insert")
prepare_for_each_table("update")
end

function too_many_processlist(con)
local rs = con:query("show processlist")
local busy_count = 0
for i = 1, rs.nrows do
local command = unpack(rs:fetch_row(), 5, 5)
if command ~= "Sleep" then busy_count = busy_count + 1 end
end
rs:free()
return busy_count >= 20
end

function get_counters(con, tid)
local sql = "select sum(k), count(id) from sbtest" .. tid
local rs = con:query(sql)
local sum_k_s, count_id_s = unpack(rs:fetch_row(), 1, 2)
rs:free()
local sum_k = tonumber(sum_k_s)
local count_id = tonumber(count_id_s)
local sql = string.format("select count(k) from sbtest%u use index(k)", tid)
local rs = con:query(sql)
local count_k_s = unpack(rs:fetch_row(), 1, 1)
rs:free()
local count_k = tonumber(count_k_s)
return sum_k, count_id, count_k
end

function thread_init()
drv = sysbench.sql.driver()
con = drv:connect()
stmt = {}
param = {}
for t = 1, sysbench.opt.tables do
stmt[t] = {}
param[t] = {}
end
prepare_statements()

local tid = sysbench.tid % sysbench.opt.threads + 1
if tid <= sysbench.opt.tables then
while too_many_processlist(con) do sleep(1) end
sum_k, count_id, count_k = get_counters(con, tid)
print("sbtest" .. tid .. ", sum(k): " .. sum_k .. ", count(id): " ..
count_id .. ", count(k): " .. count_k)
end
print("enable async commit for session: " .. sysbench.tid % sysbench.opt.tables)
con:query("set session tidb_enable_async_commit = 1")
-- print("enable follower read for session: " .. sysbench.tid % sysbench.opt.tables)
-- con:query("set @@tidb_replica_read = 'leader-and-follower'")
end

function thread_done()
local tid = sysbench.tid % sysbench.opt.threads + 1
if tid <= sysbench.opt.tables then
while too_many_processlist(con) do sleep(1) end
local sum_k_1, count_id_1, count_k_1 = get_counters(con, tid)
if sum_k_1 ~= sum_k or count_id_1 ~= count_id or count_k_1 ~= count_k then
print("corrupt sbtest" .. tid .. ", sum(k): " .. sum_k ..
", count(id): " .. count_id .. ", count(k): " .. count_k)
os.exit(-1)
end
print("consistency check for sbtest" .. tid .. " success")
end
close_statements()
con:disconnect()
end

function event()
begin()

local tnum = get_table_num()
local id_suffix = get_id()
local rs = con:query(string.format(
"SELECT id,k,id_suffix FROM sbtest%u WHERE id_suffix BETWEEN %d AND %d for update",
tnum, id_suffix, id_suffix + sysbench.opt.modifies))
local update_list = {}
local del_ins_list = {}
for i = 1, rs.nrows do
local id, k, suffix = unpack(rs:fetch_row(), 1, rs.nfields)
id = tonumber(id)
k = tonumber(k)
suffix = tonumber(suffix)

if sysbench.rand.uniform(1, 2) % 2 == 1 then
local t = {}
t["id"] = id
t["k"] = k
t["suffix"] = suffix
table.insert(del_ins_list, t)
else
table.insert(update_list, id)
end
end
rs:free()

for i = 1, #del_ins_list do
local id = del_ins_list[i]["id"]
param[tnum].delete[1]:set(id)
stmt[tnum].delete:execute()
param[tnum].insert[1]:set(id + sysbench.opt.table_size)
param[tnum].insert[2]:set(del_ins_list[i]["k"])
if sysbench.rand.uniform(1, 6) % 6 == 3 then
param[tnum].insert[3]:set(get_c_value())
else
param[tnum].insert[3]:set("")
end
param[tnum].insert[4]:set(get_pad_value())
param[tnum].insert[5]:set(del_ins_list[i]["suffix"])
stmt[tnum].insert:execute()
end

local zero_sum_seq = get_random_zero_sum_seq(#update_list)
for i = 1, #update_list do
param[tnum].update[1]:set(zero_sum_seq[i])
param[tnum].update[2]:set(update_list[i])
stmt[tnum].update:execute()
end

commit()
end