Skip to content

Commit 2d54447

Browse files
committed
Merge pull request #48 from moteus/master
Update multi-uv example.
2 parents 9a7fd63 + d4bbd66 commit 2d54447

File tree

1 file changed

+182
-78
lines changed

1 file changed

+182
-78
lines changed

examples/cURLv3/multi-uv.lua

+182-78
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
local curl = require "cURL"
22
local uv = require "lluv"
3+
local ut = require "lluv.utils"
34

45
local fprintf = function(f, ...) f:write((string.format(...))) end
56
local printf = function(...) fprintf(io.stdout, ...) end
67

78
local stderr = io.stderr
89

9-
local timeout, curl_handle
10+
local trace = false
11+
12+
trace = trace and print or function() end
1013

1114
local ACTION_NAMES = {
1215
[curl.POLL_IN ] = "POLL_IN";
@@ -15,141 +18,240 @@ local ACTION_NAMES = {
1518
[curl.POLL_NONE ] = "POLL_NONE";
1619
[curl.POLL_REMOVE ] = "POLL_REMOVE";
1720
}
21+
1822
local EVENT_NAMES = {
1923
[ uv.READABLE ] = "READABLE";
2024
[ uv.WRITABLE ] = "WRITABLE";
2125
[ uv.READABLE + uv.WRITABLE ] = "READABLE + WRITABLE";
2226
}
27+
2328
local FLAGS = {
24-
[ uv.READABLE ] = curl.CSELECT_IN;
25-
[ uv.WRITABLE ] = curl.CSELECT_OUT;
29+
[ uv.READABLE ] = curl.CSELECT_IN;
30+
[ uv.WRITABLE ] = curl.CSELECT_OUT;
2631
[ uv.READABLE + uv.WRITABLE ] = curl.CSELECT_IN + curl.CSELECT_OUT;
32+
}
2733

34+
local POLL_IO_FLAGS = {
35+
[ curl.POLL_IN ] = uv.READABLE;
36+
[ curl.POLL_OUT ] = uv.WRITABLE;
37+
[ curl.POLL_INOUT ] = uv.READABLE + uv.WRITABLE;
2838
}
2939

30-
local trace = true
40+
local Context = ut.class() do
3141

32-
trace = trace and print or function() end
42+
function Context:__init(fd)
43+
self._fd = assert(fd)
44+
self._poll = uv.poll_socket(fd)
45+
self._poll.data = {context = self}
46+
47+
assert(self._poll:fileno() == fd)
48+
49+
return self
50+
end
3351

34-
local CONTEXT = {}
52+
function Context:close()
53+
if not self._poll then return end
54+
self._poll.data = nil
55+
self._poll:close()
56+
self._poll, self._fd = nil
57+
end
58+
59+
function Context:poll(...)
60+
self._poll:start(...)
61+
end
62+
63+
function Context:fileno()
64+
return self._fd
65+
end
3566

36-
function create_curl_context(sockfd)
37-
local context = {
38-
sockfd = sockfd;
39-
poll_handle = uv.poll_socket(sockfd);
40-
}
41-
context.poll_handle.data = context
42-
43-
return context
4467
end
4568

46-
function destroy_curl_context(context)
47-
context.poll_handle:close()
48-
end
69+
-- Number of parallel request
70+
local MAX_REQUESTS
71+
local timer, multi
72+
local qtask = ut.Queue.new() -- wait tasks
73+
local qfree = ut.Queue.new() -- avaliable easy handles
74+
local qeasy = {} -- all easy handles
4975

50-
function add_download(url, num)
76+
local function on_begin(handle, url, num)
5177
local filename = tostring(num) .. ".download"
5278
local file = io.open(filename, "w")
5379
if not file then
5480
fprintf(stderr, "Error opening %s\n", filename)
5581
return
5682
end
83+
handle.data.file = file
84+
handle:setopt_writefunction(file)
5785

58-
local handle = curl.easy{
59-
url = url;
60-
writefunction = file;
61-
}
86+
fprintf(stderr, "Added download %s -> %s\n", url, filename);
87+
return true
88+
end
6289

63-
handle.data = file
90+
local function on_end(handle, err, url)
91+
handle.data.file:close()
92+
handle.data.file = nil
6493

65-
curl_handle:add_handle(handle)
66-
fprintf(stderr, "Added download %s -> %s\n", url, filename);
94+
if err then
95+
printf("%s ERROR - %s\n", url, tostring(err));
96+
else
97+
printf("%s DONE\n", url);
98+
end
6799
end
68100

69-
function check_multi_info()
70-
while true do
71-
local easy, ok, err = curl_handle:info_read(true)
72-
if not easy then curl_handle:close() error(err) end
73-
if easy == 0 then break end
101+
local function cleanup()
102+
timer:close()
74103

75-
local context = CONTEXT[e]
76-
if context then destroy_curl_context(context) end
77-
local file = assert(easy.data)
78-
file:close()
79-
local done_url = easy:getinfo_effective_url()
104+
for i, easy in ipairs(qeasy) do
105+
multi:remove_handle(easy)
80106
easy:close()
81-
if ok then
82-
printf("%s DONE\n", done_url);
83-
elseif data == "error" then
84-
printf("%s ERROR - %s\n", done_url, tostring(err));
85-
end
86107
end
108+
109+
multi:close()
87110
end
88111

89-
function curl_perform(handle, err, events)
90-
-- calls by libuv --
91-
trace("UV::POLL", handle, err, EVENT_NAMES[events] or events)
112+
local proceed_queue, add_download do
92113

93-
local flags = assert(FLAGS[events], ("unknown event:" .. events))
114+
proceed_queue = function()
115+
while true do
116+
if qtask:empty() then return end
117+
118+
if qfree:empty() then
119+
if #qeasy < MAX_REQUESTS then
120+
local easy = assert(curl.easy())
121+
qeasy[#qeasy + 1] = easy
122+
qfree:push(easy)
123+
else
124+
return
125+
end
126+
end
94127

95-
context = handle.data
128+
local task = assert(qtask:pop())
129+
local url, num = task[1], task[2]
96130

97-
curl_handle:socket_action(context.sockfd, flags)
131+
local handle = assert(qfree:pop())
98132

99-
check_multi_info()
133+
handle:setopt{
134+
url = url;
135+
fresh_connect = true;
136+
forbid_reuse = true;
137+
}
138+
139+
handle.data = {}
140+
141+
if on_begin(handle, url, num) then
142+
multi:add_handle(handle)
143+
else
144+
handle:reset().data = nil
145+
qfree:push(handle)
146+
end
147+
end
100148
end
101149

102-
function on_timeout(timer)
103-
-- calls by libuv --
104-
trace("UV::TIMEOUT", timer)
150+
add_download = function(url, num)
151+
qtask:push{url, num}
105152

106-
local running_handles, err = curl_handle:socket_action()
153+
proceed_queue()
154+
end
107155

108-
check_multi_info()
109156
end
110157

111-
function start_timeout(timeout_ms)
158+
local on_libuv_poll, on_libuv_timeout
159+
160+
local on_curl_timeout, on_curl_action do
161+
162+
on_curl_timeout = function(ms)
112163
-- calls by curl --
113-
trace("CURL::TIMEOUT", timeout_ms)
164+
trace("CURL::TIMEOUT", ms)
114165

115-
-- 0 means directly call socket_action, but we'll do it in a bit
116-
if timeout_ms <= 0 then timeout_ms = 1 end
166+
if ms <= 0 then ms = 1 end
117167

118-
timeout:stop():start(timeout_ms, 0, on_timeout)
168+
timer:start(ms, 0, on_libuv_timeout)
119169
end
120170

121-
function handle_socket(easy, s, action)
171+
on_curl_action = function(easy, fd, action)
122172
local ok, err = pcall(function()
123-
-- calls by curl --
124173
trace("CURL::SOCKET", easy, s, ACTION_NAMES[action] or action)
125174

126-
local curl_context = CONTEXT[easy] or create_curl_context(s)
127-
CONTEXT[easy] = curl_context
128-
129-
assert(curl_context.sockfd == s)
175+
local context = easy.data.context
130176

131-
if action == curl.POLL_IN then
132-
curl_context.poll_handle:start(uv.READABLE, curl_perform)
133-
elseif action == curl.POLL_OUT then
134-
curl_context.poll_handle:start(uv.WRITABLE, curl_perform)
177+
local flag = POLL_IO_FLAGS[action]
178+
if flag then
179+
if not context then
180+
context = Context.new(fd)
181+
easy.data.context = context
182+
end
183+
context:poll(flag, on_libuv_poll)
135184
elseif action == curl.POLL_REMOVE then
136-
CONTEXT[easy] = nil
137-
destroy_curl_context(curl_context)
185+
if context then
186+
easy.data.context = nil
187+
context:close()
188+
end
138189
end
139190
end)
191+
140192
if not ok then uv.defer(function() error(err) end) end
141193
end
142194

143-
timeout = uv.timer()
195+
end
144196

145-
curl_handle = curl.multi{
146-
socketfunction = handle_socket;
147-
timerfunction = start_timeout;
148-
}
197+
-- on_libuv_poll, on_libuv_timeout
198+
do
199+
200+
local curl_check_multi_info = function()
201+
while true do
202+
local easy, ok, err = multi:info_read(true)
203+
204+
if not easy then
205+
multi:close()
206+
error(err)
207+
end
208+
209+
if easy == 0 then break end
149210

150-
curl_handle = curl.multi{
151-
socketfunction = handle_socket;
152-
timerfunction = start_timeout;
211+
local done_url = easy:getinfo_effective_url()
212+
213+
local context = easy.data.context
214+
if context then context:close() end
215+
easy.data.context = nil
216+
217+
if ok then on_end(easy, nil, done_url) else on_end(easy, err, done_url) end
218+
219+
easy:reset().data = nil
220+
qfree:push(easy)
221+
end
222+
223+
proceed_queue()
224+
end
225+
226+
on_libuv_poll = function(handle, err, events)
227+
trace("UV::POLL", handle, err, EVENT_NAMES[events] or events)
228+
229+
local flags = assert(FLAGS[events], ("unknown event:" .. events))
230+
231+
context = handle.data.context
232+
233+
multi:socket_action(context:fileno(), flags)
234+
235+
curl_check_multi_info()
236+
end
237+
238+
on_libuv_timeout = function(timer)
239+
trace("UV::TIMEOUT", timer)
240+
241+
local running_handles, err = multi:socket_action()
242+
243+
curl_check_multi_info()
244+
end
245+
246+
end
247+
248+
MAX_REQUESTS = 64
249+
250+
timer = uv.timer()
251+
252+
multi = curl.multi{
253+
timerfunction = on_curl_timeout;
254+
socketfunction = on_curl_action;
153255
}
154256

155257
for i = 1, math.huge do
@@ -158,4 +260,6 @@ for i = 1, math.huge do
158260
add_download(url, i)
159261
end
160262

161-
uv.run(loop, UV_RUN_DEFAULT)
263+
uv.run()
264+
265+
cleanup()

0 commit comments

Comments
 (0)