Skip to content

Commit b4a7579

Browse files
committed
Update multi-uv example.
1 parent 1738369 commit b4a7579

File tree

1 file changed

+160
-78
lines changed

1 file changed

+160
-78
lines changed

examples/cURLv3/multi-uv.lua

Lines changed: 160 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
local curl = require "cURL"
22
local uv = require "lluv"
3+
local ut = require "lluv.utils"
34

45
local fprintf = function(f, ...) f:write((string.format(...))) end
56
local printf = function(...) fprintf(io.stdout, ...) end
67

78
local stderr = io.stderr
89

9-
local timeout, curl_handle
10+
local trace = false
11+
12+
trace = trace and print or function() end
1013

1114
local ACTION_NAMES = {
1215
[curl.POLL_IN ] = "POLL_IN";
@@ -15,141 +18,218 @@ local ACTION_NAMES = {
1518
[curl.POLL_NONE ] = "POLL_NONE";
1619
[curl.POLL_REMOVE ] = "POLL_REMOVE";
1720
}
21+
1822
local EVENT_NAMES = {
1923
[ uv.READABLE ] = "READABLE";
2024
[ uv.WRITABLE ] = "WRITABLE";
2125
[ uv.READABLE + uv.WRITABLE ] = "READABLE + WRITABLE";
2226
}
27+
2328
local FLAGS = {
24-
[ uv.READABLE ] = curl.CSELECT_IN;
25-
[ uv.WRITABLE ] = curl.CSELECT_OUT;
29+
[ uv.READABLE ] = curl.CSELECT_IN;
30+
[ uv.WRITABLE ] = curl.CSELECT_OUT;
2631
[ uv.READABLE + uv.WRITABLE ] = curl.CSELECT_IN + curl.CSELECT_OUT;
27-
2832
}
2933

30-
local trace = true
34+
local Context = ut.class() do
3135

32-
trace = trace and print or function() end
36+
function Context:__init(fd)
37+
self._fd = assert(fd)
38+
self._poll = uv.poll_socket(fd)
39+
self._poll.data = {context = self}
3340

34-
local CONTEXT = {}
41+
assert(self._poll:fileno() == fd)
3542

36-
function create_curl_context(sockfd)
37-
local context = {
38-
sockfd = sockfd;
39-
poll_handle = uv.poll_socket(sockfd);
40-
}
41-
context.poll_handle.data = context
42-
43-
return context
43+
return self
44+
end
45+
46+
function Context:close()
47+
if not self._poll then return end
48+
self._poll.data = nil
49+
self._poll:close()
50+
self._poll, self._fd = nil
51+
end
52+
53+
function Context:poll(...)
54+
self._poll:start(...)
55+
end
56+
57+
function Context:fileno()
58+
return self._fd
59+
end
60+
61+
end
62+
63+
-- Number of parallel request
64+
local MAX_REQUESTS
65+
local timer, multi
66+
local qtask = ut.Queue.new() -- wait tasks
67+
local qfree = ut.Queue.new() -- avaliable easy handles
68+
local qeasy = {} -- all easy handles
69+
70+
local function cleanup()
71+
timer:close()
72+
73+
for i, easy in ipairs(qeasy) do
74+
multi:remove_handle(easy)
75+
easy:close()
76+
end
77+
78+
multi:close()
4479
end
4580

46-
function destroy_curl_context(context)
47-
context.poll_handle:close()
48-
end
81+
local proceed_queue, add_download do
82+
83+
proceed_queue = function()
84+
if qtask:empty() then return end
85+
86+
if qfree:empty() then
87+
if #qeasy < MAX_REQUESTS then
88+
local easy = assert(curl.easy())
89+
qeasy[#qeasy + 1] = easy
90+
qfree:push(easy)
91+
else
92+
return
93+
end
94+
end
95+
96+
local task = assert(qtask:pop())
97+
local url, num = task[1], task[2]
4998

50-
function add_download(url, num)
5199
local filename = tostring(num) .. ".download"
52100
local file = io.open(filename, "w")
53101
if not file then
54102
fprintf(stderr, "Error opening %s\n", filename)
55103
return
56104
end
57105

58-
local handle = curl.easy{
106+
local handle = assert(qfree:pop())
107+
108+
handle:setopt{
59109
url = url;
60110
writefunction = file;
61111
}
62112

63-
handle.data = file
113+
handle.data = { file = file }
114+
115+
multi:add_handle(handle)
64116

65-
curl_handle:add_handle(handle)
66117
fprintf(stderr, "Added download %s -> %s\n", url, filename);
67118
end
68119

69-
function check_multi_info()
120+
add_download = function(url, num)
121+
qtask:push{url, num}
122+
123+
proceed_queue()
124+
end
125+
126+
end
127+
128+
local on_libuv_poll, on_libuv_timeout
129+
130+
local on_curl_timeout, on_curl_action do
131+
132+
on_curl_timeout = function(ms)
133+
-- calls by curl --
134+
trace("CURL::TIMEOUT", ms)
135+
136+
if ms <= 0 then ms = 1 end
137+
138+
timer:start(ms, 0, on_libuv_timeout)
139+
end
140+
141+
on_curl_action = function(easy, fd, action)
142+
local ok, err = pcall(function()
143+
trace("CURL::SOCKET", easy, s, ACTION_NAMES[action] or action)
144+
145+
local context = easy.data.context
146+
if (action == curl.POLL_IN) or (action == curl.POLL_OUT) then
147+
if not context then
148+
context = Context.new(fd)
149+
easy.data.context = context
150+
end
151+
end
152+
153+
assert(context:fileno() == fd)
154+
155+
if action == curl.POLL_IN then context:poll(uv.READABLE, on_libuv_poll)
156+
elseif action == curl.POLL_OUT then context:poll(uv.WRITABLE, on_libuv_poll)
157+
elseif action == curl.POLL_REMOVE then
158+
if context then
159+
easy.data.context = nil
160+
context:close()
161+
end
162+
end
163+
end)
164+
165+
if not ok then uv.defer(function() error(err) end) end
166+
end
167+
168+
end
169+
170+
-- on_libuv_poll, on_libuv_timeout
171+
do
172+
173+
local curl_check_multi_info = function()
70174
while true do
71-
local easy, ok, err = curl_handle:info_read(true)
72-
if not easy then curl_handle:close() error(err) end
175+
local easy, ok, err = multi:info_read(true)
176+
177+
if not easy then
178+
multi:close()
179+
error(err)
180+
end
181+
73182
if easy == 0 then break end
74183

75-
local context = CONTEXT[e]
76-
if context then destroy_curl_context(context) end
77-
local file = assert(easy.data)
78-
file:close()
79184
local done_url = easy:getinfo_effective_url()
80-
easy:close()
185+
186+
local context = easy.data.context
187+
if context then context:close() end
188+
local file = assert(easy.data.file)
189+
file:close()
190+
191+
easy.data = nil
192+
qfree:push(easy)
193+
81194
if ok then
82195
printf("%s DONE\n", done_url);
83196
elseif data == "error" then
84197
printf("%s ERROR - %s\n", done_url, tostring(err));
85198
end
199+
200+
proceed_queue()
86201
end
87202
end
88203

89-
function curl_perform(handle, err, events)
90-
-- calls by libuv --
204+
on_libuv_poll = function(handle, err, events)
91205
trace("UV::POLL", handle, err, EVENT_NAMES[events] or events)
92206

93207
local flags = assert(FLAGS[events], ("unknown event:" .. events))
94208

95-
context = handle.data
209+
context = handle.data.context
96210

97-
curl_handle:socket_action(context.sockfd, flags)
211+
multi:socket_action(context:fileno(), flags)
98212

99-
check_multi_info()
213+
curl_check_multi_info()
100214
end
101215

102-
function on_timeout(timer)
103-
-- calls by libuv --
216+
on_libuv_timeout = function(timer)
104217
trace("UV::TIMEOUT", timer)
105218

106-
local running_handles, err = curl_handle:socket_action()
107-
108-
check_multi_info()
109-
end
110-
111-
function start_timeout(timeout_ms)
112-
-- calls by curl --
113-
trace("CURL::TIMEOUT", timeout_ms)
219+
local running_handles, err = multi:socket_action()
114220

115-
-- 0 means directly call socket_action, but we'll do it in a bit
116-
if timeout_ms <= 0 then timeout_ms = 1 end
117-
118-
timeout:stop():start(timeout_ms, 0, on_timeout)
221+
curl_check_multi_info()
119222
end
120223

121-
function handle_socket(easy, s, action)
122-
local ok, err = pcall(function()
123-
-- calls by curl --
124-
trace("CURL::SOCKET", easy, s, ACTION_NAMES[action] or action)
125-
126-
local curl_context = CONTEXT[easy] or create_curl_context(s)
127-
CONTEXT[easy] = curl_context
128-
129-
assert(curl_context.sockfd == s)
130-
131-
if action == curl.POLL_IN then
132-
curl_context.poll_handle:start(uv.READABLE, curl_perform)
133-
elseif action == curl.POLL_OUT then
134-
curl_context.poll_handle:start(uv.WRITABLE, curl_perform)
135-
elseif action == curl.POLL_REMOVE then
136-
CONTEXT[easy] = nil
137-
destroy_curl_context(curl_context)
138-
end
139-
end)
140-
if not ok then uv.defer(function() error(err) end) end
141224
end
142225

143-
timeout = uv.timer()
226+
MAX_REQUESTS = 64
144227

145-
curl_handle = curl.multi{
146-
socketfunction = handle_socket;
147-
timerfunction = start_timeout;
148-
}
228+
timer = uv.timer()
149229

150-
curl_handle = curl.multi{
151-
socketfunction = handle_socket;
152-
timerfunction = start_timeout;
230+
multi = curl.multi{
231+
timerfunction = on_curl_timeout;
232+
socketfunction = on_curl_action;
153233
}
154234

155235
for i = 1, math.huge do
@@ -158,4 +238,6 @@ for i = 1, math.huge do
158238
add_download(url, i)
159239
end
160240

161-
uv.run(loop, UV_RUN_DEFAULT)
241+
uv.run()
242+
243+
cleanup()

0 commit comments

Comments
 (0)