Skip to content

Commit 645fec6

Browse files
committed
utube: implement ready space for vinyl engine
In case of a transaction conflict for 'vinyl' we need to retry an entire transaction. Part of #230
1 parent df24f17 commit 645fec6

File tree

2 files changed

+148
-107
lines changed

2 files changed

+148
-107
lines changed

queue/abstract/driver/utube.lua

Lines changed: 139 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,6 @@ function tube.new(space, on_task_change, opts)
8585

8686
local space_ready_buffer_name = space.name .. "_ready_buffer"
8787
local space_ready_buffer = box.space[space_ready_buffer_name]
88-
-- Feature implemented only for memtx engine for now.
89-
-- https://github.com/tarantool/queue/issues/230.
90-
if opts.storage_mode == tube.STORAGE_MODE_READY_BUFFER and opts.engine == 'vinyl' then
91-
error(string.format('"%s" storage mode cannot be used with vinyl engine',
92-
tube.STORAGE_MODE_READY_BUFFER))
93-
end
9488

9589
local ready_space_mode = (opts.storage_mode == tube.STORAGE_MODE_READY_BUFFER)
9690
if ready_space_mode then
@@ -167,6 +161,10 @@ local function commit()
167161
box.commit()
168162
end
169163

164+
local function rollback()
165+
box.rollback()
166+
end
167+
170168
local function empty()
171169
end
172170

@@ -179,14 +177,31 @@ local function begin_if_not_in_txn()
179177

180178
if not box.is_in_txn() then
181179
box.begin(transaction_opts)
182-
return commit
180+
return commit, rollback
183181
else
184-
return empty
182+
return empty, empty
183+
end
184+
end
185+
186+
-- Try commiting operations until success. This is required for 'vinyl' engine.
187+
-- In case of a transaction conflict for 'vinyl' we need to retry an entire
188+
-- transaction.
189+
local function try_commit_several_times(func, ...)
190+
local ok = false
191+
local ret
192+
while not ok do
193+
local commit_func, rollback_func = begin_if_not_in_txn()
194+
ok, ret = pcall(func, commit_func, ...)
195+
if ok then
196+
return ret
197+
end
198+
rollback_func()
199+
require('fiber').yield()
185200
end
186201
end
187202

188203
-- put task in space
189-
function method.put(self, data, opts)
204+
local function put(self, data, opts, commit_func)
190205
-- Taking the minimum is an implicit transactions, so it is
191206
-- always done with 'read-confirmed' mvcc isolation level.
192207
-- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled.
@@ -197,8 +212,6 @@ function method.put(self, data, opts)
197212
-- since it will open nested transactions.
198213
-- See https://github.com/tarantool/queue/issues/207
199214
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
200-
local commit_func = begin_if_not_in_txn()
201-
202215
local max = self.space.index.task_id:max()
203216

204217
local id = max and max[1] + 1 or 0
@@ -213,11 +226,18 @@ function method.put(self, data, opts)
213226
return task
214227
end
215228

229+
-- put task in space
230+
function method.put(self, data, opts)
231+
local commit_body = function(commit_func)
232+
return put(self, data, opts, commit_func)
233+
end
234+
235+
return try_commit_several_times(commit_body)
236+
end
237+
216238
-- Take the first task form the ready_buffer.
217-
local function take_ready(self)
239+
local function take_ready(self, commit_func)
218240
while true do
219-
local commit_func = begin_if_not_in_txn()
220-
221241
local task_ready = self.space_ready_buffer.index.task_id:min()
222242
if task_ready == nil then
223243
commit_func()
@@ -247,45 +267,57 @@ local function take_ready(self)
247267
end
248268
end
249269

250-
local function take(self)
251-
for s, task in self.space.index.status:pairs(state.READY,
252-
{ iterator = 'GE' }) do
253-
if task[2] ~= state.READY then
254-
break
255-
end
256-
-- Taking the minimum is an implicit transactions, so it is
257-
-- always done with 'read-confirmed' mvcc isolation level.
258-
-- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled.
259-
-- It is hapenning because 'min' for several takes in parallel will be the same since
260-
-- read confirmed isolation level makes visible all transactions that finished the commit.
261-
-- To fix it we wrap it with box.begin/commit and set right isolation level.
262-
-- Current fix does not resolve that bug in situations when we already are in transaction
263-
-- since it will open nested transactions.
264-
-- See https://github.com/tarantool/queue/issues/207
265-
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
266-
local commit_func = begin_if_not_in_txn()
267-
local taken = self.space.index.utube:min{state.TAKEN, task[3]}
268-
local take_complete = false
270+
local function take_step(self, task, commit_func)
271+
-- Taking the minimum is an implicit transactions, so it is
272+
-- always done with 'read-confirmed' mvcc isolation level.
273+
-- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled.
274+
-- It is hapenning because 'min' for several takes in parallel will be the same since
275+
-- read confirmed isolation level makes visible all transactions that finished the commit.
276+
-- To fix it we wrap it with box.begin/commit and set right isolation level.
277+
-- Current fix does not resolve that bug in situations when we already are in transaction
278+
-- since it will open nested transactions.
279+
-- See https://github.com/tarantool/queue/issues/207
280+
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
281+
local taken = self.space.index.utube:min{state.TAKEN, task[3]}
282+
local take_complete = false
269283

270-
if taken == nil or taken[2] ~= state.TAKEN then
271-
task = self.space:update(task[1], { { '=', 2, state.TAKEN } })
272-
take_complete = true
273-
end
284+
if taken == nil or taken[2] ~= state.TAKEN then
285+
task = self.space:update(task[1], { { '=', 2, state.TAKEN } })
286+
take_complete = true
287+
end
274288

275-
commit_func()
276-
if take_complete then
277-
self.on_task_change(task, 'take')
278-
return task
279-
end
289+
commit_func()
290+
if take_complete then
291+
self.on_task_change(task, 'take')
292+
return task
280293
end
281294
end
282295

283296
-- take task
284297
function method.take(self)
285298
if self.ready_space_mode then
286-
return take_ready(self)
299+
local commit_body = function(commit_func)
300+
return take_ready(self, commit_func)
301+
end
302+
303+
return try_commit_several_times(commit_body)
304+
end
305+
306+
for _, task in self.space.index.status:pairs(state.READY,
307+
{ iterator = 'GE' }) do
308+
if task[2] ~= state.READY then
309+
break
310+
end
311+
312+
local commit_body = function(commit_func)
313+
return take_step(self, task, commit_func)
314+
end
315+
316+
local ret = try_commit_several_times(commit_body)
317+
if ret ~= nil then
318+
return ret
319+
end
287320
end
288-
return take(self)
289321
end
290322

291323
-- touch task
@@ -300,9 +332,7 @@ local function delete_ready(self, id, utube)
300332
end
301333

302334
-- delete task
303-
function method.delete(self, id)
304-
local commit_func = begin_if_not_in_txn()
305-
335+
local function delete(self, id, commit_func)
306336
local task = self.space:get(id)
307337
self.space:delete(id)
308338
if task ~= nil then
@@ -331,10 +361,17 @@ function method.delete(self, id)
331361
return task
332362
end
333363

334-
-- release task
335-
function method.release(self, id, opts)
336-
local commit_func = begin_if_not_in_txn()
364+
-- delete task
365+
function method.delete(self, id)
366+
local commit_body = function(commit_func)
367+
return delete(self, id, commit_func)
368+
end
369+
370+
return try_commit_several_times(commit_body)
371+
end
337372

373+
-- release task
374+
local function release(self, id, opts, commit_func)
338375
local task = self.space:update(id, {{ '=', 2, state.READY }})
339376
if task ~= nil then
340377
if self.ready_space_mode then
@@ -357,10 +394,17 @@ function method.release(self, id, opts)
357394
return task
358395
end
359396

360-
-- bury task
361-
function method.bury(self, id)
362-
local commit_func = begin_if_not_in_txn()
397+
-- release task
398+
function method.release(self, id, opts)
399+
local commit_body = function(commit_func)
400+
return release(self, id, opts, commit_func)
401+
end
363402

403+
return try_commit_several_times(commit_body)
404+
end
405+
406+
-- bury task
407+
local function bury(self, id, commit_func)
364408
local current_task = self.space:get{id}
365409
local task = self.space:update(id, {{ '=', 2, state.BURIED }})
366410
if task ~= nil then
@@ -390,35 +434,54 @@ function method.bury(self, id)
390434
return task
391435
end
392436

393-
-- unbury several tasks
394-
function method.kick(self, count)
395-
for i = 1, count do
396-
local commit_func = begin_if_not_in_txn()
437+
-- bury task
438+
function method.bury(self, id)
439+
local commit_body = function(commit_func)
440+
return bury(self, id, commit_func)
441+
end
397442

398-
local task = self.space.index.status:min{ state.BURIED }
399-
if task == nil then
400-
return i - 1
401-
end
402-
if task[2] ~= state.BURIED then
403-
return i - 1
404-
end
443+
return try_commit_several_times(commit_body)
444+
end
405445

406-
task = self.space:update(task[1], {{ '=', 2, state.READY }})
407-
if self.ready_space_mode then
408-
local prev_task = self.space_ready_buffer.index.utube:get{task[3]}
409-
if prev_task ~= nil then
410-
if prev_task[1] > task[1] then
411-
self.space_ready_buffer:delete(prev_task[1])
412-
self.space_ready_buffer:insert({task[1], task[2]})
413-
end
414-
else
415-
put_ready(self, task[3])
446+
-- unbury several tasks
447+
local function kick_step(self, id, commit_func)
448+
local task = self.space.index.status:min{ state.BURIED }
449+
if task == nil then
450+
return id - 1
451+
end
452+
if task[2] ~= state.BURIED then
453+
return id - 1
454+
end
455+
456+
task = self.space:update(task[1], {{ '=', 2, state.READY }})
457+
if self.ready_space_mode then
458+
local prev_task = self.space_ready_buffer.index.utube:get{task[3]}
459+
if prev_task ~= nil then
460+
if prev_task[1] > task[1] then
461+
self.space_ready_buffer:delete(prev_task[1])
462+
self.space_ready_buffer:insert({task[1], task[2]})
416463
end
464+
else
465+
put_ready(self, task[3])
417466
end
467+
end
418468

419-
commit_func()
469+
commit_func()
470+
471+
self.on_task_change(task, 'kick')
472+
end
420473

421-
self.on_task_change(task, 'kick')
474+
-- unbury several tasks
475+
function method.kick(self, count)
476+
for i = 1, count do
477+
local commit_body = function(commit_func)
478+
return kick_step(self, i, commit_func)
479+
end
480+
481+
local ret = try_commit_several_times(commit_body)
482+
if ret ~= nil then
483+
return ret
484+
end
422485
end
423486
return count
424487
end

t/030-utube.t

Lines changed: 9 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,16 @@ test:ok(queue, 'queue is loaded')
1919
local tube = queue.create_tube('test', 'utube', { engine = engine })
2020
local tube2 = queue.create_tube('test_stat', 'utube', { engine = engine })
2121
local tube_ready, tube2_ready
22-
if engine ~= 'vinyl' then
23-
tube_ready = queue.create_tube('test_ready', 'utube',
24-
{ engine = engine, storage_mode = queue.driver.utube.STORAGE_MODE_READY_BUFFER })
25-
tube2_ready = queue.create_tube('test_stat_ready', 'utube',
26-
{ engine = engine, storage_mode = queue.driver.utube.STORAGE_MODE_READY_BUFFER })
27-
end
22+
tube_ready = queue.create_tube('test_ready', 'utube',
23+
{ engine = engine, storage_mode = queue.driver.utube.STORAGE_MODE_READY_BUFFER })
24+
tube2_ready = queue.create_tube('test_stat_ready', 'utube',
25+
{ engine = engine, storage_mode = queue.driver.utube.STORAGE_MODE_READY_BUFFER })
2826
test:ok(tube, 'test tube created')
2927
test:is(tube.name, 'test', 'tube.name')
3028
test:is(tube.type, 'utube', 'tube.type')
3129

3230
test:test('Utube statistics', function(test)
33-
if engine ~= 'vinyl' then
34-
test:plan(13 * 2)
35-
else
36-
test:plan(13)
37-
end
31+
test:plan(13 * 2)
3832
for _, tube_stat in ipairs({tube2, tube2_ready}) do
3933
if tube_stat == nil then
4034
break
@@ -78,11 +72,7 @@ end)
7872

7973

8074
test:test('Easy put/take/ack', function(test)
81-
if engine ~= 'vinyl' then
82-
test:plan(12 * 2)
83-
else
84-
test:plan(12)
85-
end
75+
test:plan(12 * 2)
8676

8777
for _, test_tube in ipairs({tube, tube_ready}) do
8878
if test_tube == nil then
@@ -110,11 +100,7 @@ test:test('Easy put/take/ack', function(test)
110100
end)
111101

112102
test:test('ack in utube', function(test)
113-
if engine ~= 'vinyl' then
114-
test:plan(8 * 2)
115-
else
116-
test:plan(8)
117-
end
103+
test:plan(8 * 2)
118104

119105
for _, test_tube in ipairs({tube, tube_ready}) do
120106
if test_tube == nil then
@@ -145,11 +131,7 @@ test:test('ack in utube', function(test)
145131
end
146132
end)
147133
test:test('bury in utube', function(test)
148-
if engine ~= 'vinyl' then
149-
test:plan(8 * 2)
150-
else
151-
test:plan(8)
152-
end
134+
test:plan(8 * 2)
153135

154136
for _, test_tube in ipairs({tube, tube_ready}) do
155137
if test_tube == nil then
@@ -180,11 +162,7 @@ test:test('bury in utube', function(test)
180162
end
181163
end)
182164
test:test('instant bury', function(test)
183-
if engine ~= 'vinyl' then
184-
test:plan(1 * 2)
185-
else
186-
test:plan(1)
187-
end
165+
test:plan(1 * 2)
188166
tube:put(1, {ttr=60})
189167
local taken = tube:take(.1)
190168
test:is(tube:bury(taken[1])[2], '!', 'task is buried')

0 commit comments

Comments
 (0)