Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 09a7a76

Browse files
committedMar 2, 2019
curl
1 parent 0f28ef0 commit 09a7a76

39 files changed

+3659
-537
lines changed
 

‎example/curl/curl.cc

Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
#include "httpcurl.h"
2+
3+
static void dummy(const std::shared_ptr<Channel> &)
4+
{
5+
6+
}
7+
8+
Request::Request(Curl *owner, const char *url, const char *data, size_t len)
9+
: owner(owner),
10+
curl(curl_easy_init())
11+
{
12+
setopt(CURLOPT_URL, url);
13+
setopt(CURLOPT_WRITEFUNCTION, &Request::writeData);
14+
setopt(CURLOPT_WRITEDATA, this);
15+
setopt(CURLOPT_HEADERFUNCTION, &Request::headerData);
16+
setopt(CURLOPT_HEADERDATA, this);
17+
setopt(CURLOPT_PRIVATE, this);
18+
setopt(CURLOPT_USERAGENT, "curl");
19+
setopt(CURLOPT_POST, 1L);
20+
setopt(CURLOPT_POSTFIELDS, data);
21+
setopt(CURLOPT_POSTFIELDSIZE, len);
22+
23+
LOG_DEBUG << curl << " " << url;
24+
curl_multi_add_handle(owner->getCurlm(), curl);
25+
}
26+
27+
Request::Request(Curl *owner, const char *url)
28+
: owner(owner),
29+
curl(curl_easy_init())
30+
{
31+
setopt(CURLOPT_URL, url);
32+
setopt(CURLOPT_WRITEFUNCTION, &Request::writeData);
33+
setopt(CURLOPT_WRITEDATA, this);
34+
setopt(CURLOPT_HEADERFUNCTION, &Request::headerData);
35+
setopt(CURLOPT_HEADERDATA, this);
36+
setopt(CURLOPT_PRIVATE, this);
37+
setopt(CURLOPT_USERAGENT, "curl");
38+
39+
LOG_DEBUG << curl << " " << url;
40+
curl_multi_add_handle(owner->getCurlm(), curl);
41+
}
42+
43+
Request::~Request()
44+
{
45+
assert(!channel || channel->isNoneEvent());
46+
curl_multi_remove_handle(owner->getCurlm(), curl);
47+
curl_easy_cleanup(curl);
48+
}
49+
50+
void Request::headerOnly()
51+
{
52+
setopt(CURLOPT_NOBODY, 1);
53+
}
54+
55+
void Request::setRange(const std::string_view range)
56+
{
57+
setopt(CURLOPT_RANGE, range.data());
58+
}
59+
60+
const char *Request::getEffectiveUrl()
61+
{
62+
const char *p = nullptr;
63+
curl_easy_getinfo(curl, CURLINFO_EFFECTIVE_URL, &p);
64+
return p;
65+
}
66+
67+
const char *Request::getRedirectUrl()
68+
{
69+
const char *p = nullptr;
70+
curl_easy_getinfo(curl, CURLINFO_REDIRECT_URL, &p);
71+
return p;
72+
}
73+
74+
int Request::getResponseCode()
75+
{
76+
long code = 0;
77+
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &code);
78+
return static_cast<int>(code);
79+
}
80+
81+
Channel *Request::setChannel(int fd)
82+
{
83+
assert(channel.get() == nullptr);
84+
channel.reset(new Channel(owner->getLoop(), fd));
85+
channel->setTie(shared_from_this());
86+
return channel.get();
87+
}
88+
89+
void Request::removeChannel()
90+
{
91+
channel->disableAll();
92+
channel->remove();
93+
owner->getLoop()->queueInLoop(std::bind(dummy, channel));
94+
channel.reset();
95+
}
96+
97+
void Request::done(int code)
98+
{
99+
if (doneCb)
100+
{
101+
doneCb(this, code);
102+
}
103+
}
104+
105+
void Request::dataCallback(const char *buffer, int len)
106+
{
107+
if (dataCb)
108+
{
109+
dataCb(this, buffer, len);
110+
}
111+
}
112+
113+
void Request::headerCallback(const char *buffer, int len)
114+
{
115+
if (headerCb)
116+
{
117+
headerCb(buffer, len);
118+
}
119+
}
120+
121+
size_t Request::writeData(char *buffer, size_t size, size_t nmemb, void *userp)
122+
{
123+
assert(size == 1);
124+
Request *req = static_cast<Request*>(userp);
125+
req->dataCallback(buffer, static_cast<int>(nmemb));
126+
return nmemb;
127+
}
128+
129+
size_t Request::headerData(char *buffer, size_t size, size_t nmemb, void *userp)
130+
{
131+
assert(size == 1);
132+
Request *req = static_cast<Request*>(userp);
133+
req->headerCallback(buffer, static_cast<int>(nmemb));
134+
return nmemb;
135+
}
136+
137+
// ==================================================================
138+
139+
void Curl::initialize(Option opt)
140+
{
141+
curl_global_init(opt == kCURLnossl ? CURL_GLOBAL_NOTHING : CURL_GLOBAL_SSL);
142+
}
143+
144+
int Curl::socketCallback(CURL *c, int fd, int what, void *userp, void *socketp)
145+
{
146+
Curl *curl = static_cast<Curl*>(userp);
147+
const char *whatstr[]={ "none", "IN", "OUT", "INOUT", "REMOVE" };
148+
LOG_DEBUG << "Curl::socketCallback [" << curl << "] - fd = " << fd
149+
<< " what = " << whatstr[what];
150+
Request *req = nullptr;
151+
curl_easy_getinfo(c, CURLINFO_PRIVATE, &req);
152+
assert(req->getCurl() == c);
153+
if (what == CURL_POLL_REMOVE)
154+
{
155+
Channel *ch = static_cast<Channel*>(socketp);
156+
assert(req->getChannel() == ch);
157+
req->removeChannel();
158+
ch = nullptr;
159+
curl_multi_assign(curl->curlm, fd, ch);
160+
}
161+
else
162+
{
163+
Channel *ch = static_cast<Channel*>(socketp);
164+
if (!ch)
165+
{
166+
ch = req->setChannel(fd);
167+
ch->setReadCallback(std::bind(&Curl::onRead, curl, fd));
168+
ch->setWriteCallback(std::bind(&Curl::onWrite, curl, fd));
169+
ch->enableReading();
170+
curl_multi_assign(curl->curlm, fd, ch);
171+
LOG_TRACE << "new channel for fd=" << fd;
172+
}
173+
174+
assert(req->getChannel() == ch);
175+
// update
176+
if (what & CURL_POLL_OUT)
177+
{
178+
ch->enableWriting();
179+
}
180+
else
181+
{
182+
ch->disableWriting();
183+
}
184+
}
185+
return 0;
186+
}
187+
188+
int Curl::timerCallback(CURLM *curlm, long ms, void *userp)
189+
{
190+
Curl *curl = static_cast<Curl*>(userp);
191+
LOG_DEBUG << curl << " " << ms << " ms";
192+
curl->loop->runAfter(static_cast<int>(ms)/1000.0, false, std::bind(&Curl::onTimer, curl));
193+
return 0;
194+
}
195+
196+
Curl::Curl(EventLoop *loop)
197+
: loop(loop),
198+
curlm(curl_multi_init()),
199+
runningHandles(0),
200+
prevRunningHandles(0)
201+
{
202+
curl_multi_setopt(curlm, CURLMOPT_SOCKETFUNCTION, &Curl::socketCallback);
203+
curl_multi_setopt(curlm, CURLMOPT_SOCKETDATA, this);
204+
curl_multi_setopt(curlm, CURLMOPT_TIMERFUNCTION, &Curl::timerCallback);
205+
curl_multi_setopt(curlm, CURLMOPT_TIMERDATA, this);
206+
}
207+
208+
Curl::~Curl()
209+
{
210+
curl_multi_cleanup(curlm);
211+
}
212+
213+
RequestPtr Curl::getUrl(std::string_view url)
214+
{
215+
RequestPtr req(new Request(this, url.data()));
216+
return req;
217+
}
218+
219+
RequestPtr Curl::getUrl(std::string_view url, std::string_view body)
220+
{
221+
RequestPtr req(new Request(this, url.data(), body.data(), body.size()));
222+
return req;
223+
}
224+
225+
void Curl::onTimer()
226+
{
227+
CURLMcode rc = CURLM_OK;
228+
do
229+
{
230+
LOG_TRACE;
231+
rc = curl_multi_socket_action(curlm, CURL_SOCKET_TIMEOUT, 0, &runningHandles);
232+
LOG_TRACE << rc << " " << runningHandles;
233+
} while (rc == CURLM_CALL_MULTI_PERFORM);
234+
checkFinish();
235+
}
236+
237+
void Curl::onRead(int fd)
238+
{
239+
CURLMcode rc = CURLM_OK;
240+
do
241+
{
242+
LOG_TRACE << fd;
243+
rc = curl_multi_socket_action(curlm, fd, CURL_POLL_IN, &runningHandles);
244+
LOG_TRACE << fd << " " << rc << " " << runningHandles;
245+
} while (rc == CURLM_CALL_MULTI_PERFORM);
246+
checkFinish();
247+
}
248+
249+
void Curl::onWrite(int fd)
250+
{
251+
CURLMcode rc = CURLM_OK;
252+
do
253+
{
254+
LOG_TRACE << fd;
255+
rc = curl_multi_socket_action(curlm, fd, CURL_POLL_OUT, &runningHandles);
256+
LOG_TRACE << fd << " " << rc << " " << runningHandles;
257+
} while (rc == CURLM_CALL_MULTI_PERFORM);
258+
checkFinish();
259+
}
260+
261+
void Curl::checkFinish()
262+
{
263+
if (prevRunningHandles > runningHandles || runningHandles == 0)
264+
{
265+
CURLMsg *msg = nullptr;
266+
int left = 0;
267+
while ( (msg = curl_multi_info_read(curlm, &left)) != nullptr)
268+
{
269+
if (msg->msg == CURLMSG_DONE)
270+
{
271+
CURL *c = msg->easy_handle;
272+
CURLcode res = msg->data.result;
273+
Request *req = nullptr;
274+
curl_easy_getinfo(c, CURLINFO_PRIVATE, &req);
275+
assert(req->getCurl() == c);
276+
LOG_TRACE << req << " done";
277+
req->done(res);
278+
}
279+
}
280+
}
281+
prevRunningHandles = runningHandles;
282+
}
283+

‎example/curl/curl.h

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
#pragma once
2+
#include "all.h"
3+
#include "eventloop.h"
4+
#include "channel.h"
5+
#include "tcpserver.h"
6+
#include "log.h"
7+
#include <curl/curl.h>
8+
9+
extern "C"
10+
{
11+
typedef void CURLM;
12+
typedef void CURL;
13+
}
14+
15+
class Curl;
16+
class Request : public std::enable_shared_from_this<Request>
17+
{
18+
public:
19+
typedef std::function<void(Request *, const char *, int)> DataCallback;
20+
typedef std::function<void(const char *, int)> HeadCallback;
21+
typedef std::function<void(Request *, int)> DoneCallback;
22+
23+
Request(Curl *, const char *url);
24+
Request(Curl *, const char *url, const char *data, size_t len);
25+
26+
~Request();
27+
28+
void setDataCallback(const DataCallback &cb)
29+
{ dataCb = cb; }
30+
31+
void setDoneCallback(const DoneCallback &cb)
32+
{ doneCb = cb; }
33+
34+
void setHeaderCallback(const HeadCallback &cb)
35+
{ headerCb = cb; }
36+
37+
void headerOnly();
38+
39+
void setRange(const std::string_view range);
40+
41+
template<typename OPT>
42+
int setopt(OPT opt, long p)
43+
{
44+
return curl_easy_setopt(curl, opt, p);
45+
}
46+
47+
template<typename OPT>
48+
int setopt(OPT opt, const char *p)
49+
{
50+
return curl_easy_setopt(curl, opt, p);
51+
}
52+
53+
template<typename OPT>
54+
int setopt(OPT opt, void *p)
55+
{
56+
return curl_easy_setopt(curl, opt, p);
57+
}
58+
59+
template<typename OPT>
60+
int setopt(OPT opt, size_t (*p)(char *, size_t , size_t , void *))
61+
{
62+
return curl_easy_setopt(curl, opt, p);
63+
}
64+
65+
TcpConnectionPtr getTcpConnection() { return conn; }
66+
void setTcpConnection(const TcpConnectionPtr &connection) { conn = connection; }
67+
const char *getEffectiveUrl();
68+
const char *getRedirectUrl();
69+
int getResponseCode();
70+
71+
// internal
72+
Channel *setChannel(int fd);
73+
void removeChannel();
74+
void done(int code);
75+
CURL *getCurl() { return curl; }
76+
Channel *getChannel() { return channel.get(); }
77+
private:
78+
Request(const Request&);
79+
void operator=(const Request&);
80+
81+
void dataCallback(const char *buffer, int len);
82+
void headerCallback(const char *buffer, int len);
83+
static size_t writeData(char *buffer, size_t size, size_t nmemb, void *userp);
84+
static size_t headerData(char *buffer, size_t size, size_t nmemb, void *userp);
85+
void doneCallback();
86+
87+
class Curl *owner;
88+
CURL *curl;
89+
std::shared_ptr<Channel> channel;
90+
DataCallback dataCb;
91+
HeadCallback headerCb;
92+
DoneCallback doneCb;
93+
TcpConnectionPtr conn;
94+
};
95+
96+
typedef std::shared_ptr<Request> RequestPtr;
97+
class Curl
98+
{
99+
public:
100+
enum Option
101+
{
102+
kCURLnossl = 0,
103+
kCURLssl = 1,
104+
};
105+
106+
explicit Curl(EventLoop *loop);
107+
~Curl();
108+
109+
RequestPtr getUrl(std::string_view url);
110+
RequestPtr getUrl(std::string_view url, std::string_view body);
111+
112+
static void initialize(Option opt = kCURLnossl);
113+
114+
// internal
115+
CURLM *getCurlm() { return curlm; }
116+
EventLoop* getLoop() { return loop; }
117+
118+
private:
119+
Curl(const Curl&);
120+
void operator=(const Curl&);
121+
122+
void onTimer();
123+
void onRead(int fd);
124+
void onWrite(int fd);
125+
void checkFinish();
126+
127+
static int socketCallback(CURL *, int, int, void*, void*);
128+
static int timerCallback(CURLM *, long, void*);
129+
130+
EventLoop *loop;
131+
CURLM *curlm;
132+
int runningHandles;
133+
int prevRunningHandles;
134+
};
135+

‎example/curl/main.cc

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
#include "httpcurl.h"
2+
3+
4+
EventLoop* g_loop = NULL;
5+
6+
void onData(Request* c, const char* data, int len)
7+
{
8+
printf("data %s\n", data);
9+
}
10+
11+
void onHead(const char* data, int len)
12+
{
13+
printf("data1 %s\n", data);
14+
}
15+
16+
void done(Request* c, int code)
17+
{
18+
printf("done %p %s %d\n", c, c->getEffectiveUrl(), code);
19+
}
20+
21+
void done2(Request* c, int code)
22+
{
23+
printf("done2 %p %s %d %d\n", c, c->getRedirectUrl(), c->getResponseCode(), code);
24+
//g_loop->quit();
25+
}
26+
27+
int main(int argc, char* argv[])
28+
{
29+
EventLoop loop;
30+
g_loop = &loop;
31+
loop.runAfter(5.0, std::bind(&EventLoop::quit, &loop));
32+
Curl::initialize(Curl::kCURLnossl);
33+
Curl curl(&loop);
34+
signal(SIGPIPE, SIG_IGN);
35+
36+
RequestPtr req = curl.getUrl("http://192.168.6.1:8000/");
37+
req->setDataCallback(onData);
38+
req->setDoneCallback(done2);
39+
req->setHeaderCallback(onHead);
40+
41+
curl::RequestPtr req2 = curl.getUrl("https://github.com");
42+
//req2->allowRedirect(5);
43+
req2->setDataCallback(onData);
44+
req2->setDoneCallback(done);
45+
loop.run();
46+
}

‎example/evcpp/main.cpp

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1-
#include "evcpp.h"
2-
3-
void onConnect(evcpp::TcpConnectionPtr conn)
4-
{
5-
printf("onConnect %zd\n",conn.use_count());
6-
}
7-
8-
int main()
9-
{
10-
evcpp::EventLoop loop;
11-
evcpp::Listener listener(&loop,1234);
12-
listener.setNewConnectionCallback(onConnect);
13-
loop.loop();
14-
}
1+
#include "evcpp.h"
2+
3+
void onConnect(evcpp::TcpConnectionPtr conn)
4+
{
5+
printf("onConnect %zd\n",conn.use_count());
6+
}
7+
8+
int main()
9+
{
10+
evcpp::EventLoop loop;
11+
evcpp::Listener listener(&loop,1234);
12+
listener.setNewConnectionCallback(onConnect);
13+
loop.loop();
14+
}

‎example/filesearch/client.cc

Lines changed: 356 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,356 @@
1+
#include "all.h"
2+
#include "tcpconnection.h"
3+
#include "tcpclient.h"
4+
#include "eventloop.h"
5+
#include "log.h"
6+
#include "table.h"
7+
#include "posix.h"
8+
#include "tablebuilder.h"
9+
#include "memtable.h"
10+
#include "option.h"
11+
12+
const size_t kMaxSize = 10 * 1000 * 1000;
13+
enum SaverState
14+
{
15+
kNotFound,
16+
kFound,
17+
kDeleted,
18+
kCorrupt,
19+
};
20+
21+
class Sharder
22+
{
23+
public:
24+
explicit Sharder(int32_t nbuckets)
25+
{
26+
for (int32_t i = 0; i < nbuckets; ++i)
27+
{
28+
char buf[256];
29+
snprintf(buf, sizeof buf, "shard-%05d-of-%05d", i, nbuckets);
30+
std::shared_ptr<std::ofstream> in(new std::ofstream(buf));
31+
buckets.push_back(in);
32+
}
33+
assert(buckets.size() == static_cast<size_t>(nbuckets));
34+
}
35+
36+
void output(const std::string &key, const std::string &value)
37+
{
38+
size_t idx = std::hash<std::string>()(key) % buckets.size();
39+
*buckets[idx] << key << '\t' << value << '\n';
40+
}
41+
42+
protected:
43+
std::vector<std::shared_ptr<std::ofstream>> buckets;
44+
private:
45+
Sharder(const Sharder&);
46+
void operator=(const Sharder&);
47+
};
48+
// ======= merge =======
49+
50+
class Source // copyable
51+
{
52+
public:
53+
explicit Source(const std::shared_ptr<std::ifstream> &in)
54+
: in(in)
55+
{
56+
57+
}
58+
59+
bool next()
60+
{
61+
std::string line;
62+
if (getline(*(in.get()), line))
63+
{
64+
size_t tab = line.find('\t');
65+
if (tab != std::string::npos)
66+
{
67+
std::string key(line.c_str(), line.c_str() + tab);
68+
this->key = key;
69+
std::string(line.c_str() + tab + 1, line.c_str() + line.size());
70+
this->value = value;
71+
return true;
72+
}
73+
}
74+
return false;
75+
}
76+
77+
bool operator<(const Source &rhs) const
78+
{
79+
return key < rhs.key;
80+
}
81+
82+
std::string getKey() { MD5(key).toStr(); }
83+
std::string getValue() { return value; }
84+
void outputTo(std::ostream &out)
85+
{
86+
out << key << '\t' << value << '\n';
87+
}
88+
89+
std::shared_ptr<std::ifstream> in;
90+
std::string key;
91+
std::string value;
92+
};
93+
94+
class Client
95+
{
96+
public:
97+
Client(const char *ip, uint16_t port, int32_t argc, char *argv[])
98+
:ip(ip),
99+
port(port),
100+
nbuckets(10)
101+
{
102+
EventLoop loop;
103+
this->loop = &loop;
104+
TcpClient client(&loop, ip, port, nullptr);
105+
client.setConnectionCallback(std::bind(&Client::connectionCallback, this, std::placeholders::_1));
106+
client.setMessageCallback(std::bind(&Client::messageCallback, this, std::placeholders::_1, std::placeholders::_2));
107+
client.enableRetry();
108+
client.connect(true);
109+
this->client = &client;
110+
111+
shard(nbuckets, argc, argv);
112+
sortShareds(nbuckets);
113+
merge(nbuckets);
114+
}
115+
116+
void writeCompleteCallback(const TcpConnectionPtr &conn)
117+
{
118+
conn->startRead();
119+
conn->setWriteCompleteCallback(WriteCompleteCallback());
120+
}
121+
122+
void highWaterCallback(const TcpConnectionPtr &conn, size_t bytesToSent)
123+
{
124+
LOG_INFO << " bytes " << bytesToSent;
125+
if (conn->outputBuffer()->readableBytes() > 0)
126+
{
127+
conn->stopRead();
128+
conn->setWriteCompleteCallback(
129+
std::bind(&Client::writeCompleteCallback, this, std::placeholders::_1));
130+
}
131+
}
132+
133+
void connectionCallback(const TcpConnectionPtr &conn)
134+
{
135+
conn->setHighWaterMarkCallback(
136+
std::bind(&Client::highWaterCallback, this, std::placeholders::_1, std::placeholders::_2),
137+
1024 * 1024);
138+
}
139+
140+
void cmp(const std::any &arg, const std::string_view &ikey, const std::string_view &v)
141+
{
142+
auto c = ikey.compare(v);
143+
if (c == 0)
144+
{
145+
state = kCorrupt;
146+
}
147+
else
148+
{
149+
state = kFound;
150+
}
151+
}
152+
153+
void messageCallback(const TcpConnectionPtr &conn, Buffer *buffer)
154+
{
155+
const char *crlf;
156+
while ((crlf = buffer->findCRLF()) != nullptr)
157+
{
158+
// string request(buffer->peek(), crlf);
159+
// printf("%s\n", request.c_str());
160+
const char *tab = std::find(buffer->peek(), crlf, '\t');
161+
if (tab != crlf)
162+
{
163+
std::string key(buffer->peek(), tab);
164+
std::string value(tab + 1, crlf);
165+
166+
Status s = table->internalGet(ReadOptions(), key, nullptr, std::bind(&Client::cmp, this,
167+
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
168+
169+
if (!s.ok())
170+
{
171+
return s;
172+
}
173+
174+
switch (state)
175+
{
176+
case kNotFound:
177+
files[key] = value;
178+
break; // Keep searching in other files
179+
case kFound:
180+
break;
181+
}
182+
}
183+
else
184+
{
185+
LOG_WARN << "Wrong format, no tab found";
186+
conn->shutdown();
187+
}
188+
buffer->retrieveUntil(crlf + 2);
189+
}
190+
}
191+
192+
void merge(const int32_t nbuckets)
193+
{
194+
std::vector<std::shared_ptr<std::ifstream>> inputs;
195+
std::vector<Source> keys;
196+
197+
for (int32_t i = 0; i < nbuckets; ++i)
198+
{
199+
char buf[256];
200+
snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, nbuckets);
201+
std::shared_ptr<std::ifstream> in(new std::ifstream(buf));
202+
inputs.push_back(in);
203+
Source rec(in);
204+
if (rec.next())
205+
{
206+
keys.push_back(rec);
207+
}
208+
::unlink(buf);
209+
}
210+
211+
Options options;
212+
std::shared_ptr<WritableFile> wfile;
213+
s = options.env->newWritableFile("output", wfile);
214+
assert(s.ok());
215+
TableBuilder builder(options, wfile);
216+
217+
std::make_heap(keys.begin(), keys.end());
218+
while (!keys.empty())
219+
{
220+
std::pop_heap(keys.begin(), keys.end());
221+
builder.add(keys.back().getKey(), keys.back().getValue());
222+
assert(builder.status().ok());
223+
224+
if (keys.back().next())
225+
{
226+
std::push_heap(keys.begin(), keys.end());
227+
}
228+
else
229+
{
230+
keys.pop_back();
231+
}
232+
}
233+
234+
s = builder.finish();
235+
assert(s.ok());
236+
s = wfile->sync();
237+
assert(s.ok());
238+
s = wfile->close();
239+
assert(s.ok());
240+
241+
uint64_t size;
242+
s = options.env->getFileSize(filename,&size);
243+
assert (s.ok());
244+
assert(size == builder.fileSize());
245+
246+
std::shared_ptr<RandomAccessFile> mfile = nullptr;
247+
s = options.env->newRandomAccessFile(filename, mfile);
248+
assert(s.ok());
249+
s = Table::open(options, mfile, builder.fileSize(), table);
250+
assert(s.ok());
251+
}
252+
253+
void shard(int32_t nbuckets, int32_t argc, char *argv[])
254+
{
255+
Sharder sharder(nbuckets);
256+
for (int32_t i = 1; i < argc; ++i)
257+
{
258+
std::cout << " processing input file " << argv[i] << std::endl;
259+
std::map<std::string, std::string> shareds;
260+
std::ifstream in(argv[i]);
261+
while (in && !in.eof())
262+
{
263+
shareds.clear();
264+
std::string str;
265+
while (in >> str)
266+
{
267+
shareds.insert(std::make_pair(str, str));
268+
if (shareds.size() > kMaxSize)
269+
{
270+
std::cout << " split" << std::endl;
271+
break;
272+
}
273+
}
274+
275+
for (const auto &kv : shareds)
276+
{
277+
sharder.output(kv.first, kv.second);
278+
}
279+
}
280+
}
281+
std::cout << "shuffling done" << std::endl;
282+
}
283+
284+
std::map<std::string, std::string> readShard(int32_t idx, int32_t nbuckets)
285+
{
286+
std::map<std::string, std::string> shareds;
287+
288+
char buf[256];
289+
snprintf(buf, sizeof buf, "shard-%05d-of-%05d", idx, nbuckets);
290+
std::cout << " reading " << buf << std::endl;
291+
{
292+
std::ifstream in(buf);
293+
std::string line;
294+
295+
while (getline(in, line))
296+
{
297+
size_t tab = line.find('\t');
298+
assert(tab != std::string::npos);
299+
std::string key(line.c_str(), line.c_str() + tab);
300+
std::string value(line.c_str() + tab + 1, line.c_str() + line.size());
301+
shareds.insert(std::make_pair(key, value));
302+
}
303+
}
304+
305+
::unlink(buf);
306+
return shareds;
307+
}
308+
309+
void sortShareds(const int32_t nbuckets)
310+
{
311+
for (int32_t i = 0; i < nbuckets; ++i)
312+
{
313+
// std::cout << " sorting " << std::endl;
314+
std::map<std::string, std::string> shareds;
315+
for (const auto &entry : readShard(i, nbuckets))
316+
{
317+
shareds.insert(std::make_pair(entry.first, entry.second));
318+
}
319+
320+
char buf[256];
321+
snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, nbuckets);
322+
std::ofstream out(buf);
323+
std::cout << " writing " << buf << std::endl;
324+
for (auto &it : shareds)
325+
{
326+
out << it.first << '\t' << it.second << '\n';
327+
}
328+
}
329+
std::cout << "reducing done" << std::endl;
330+
}
331+
332+
void run()
333+
{
334+
loop->run();
335+
}
336+
private:
337+
Client(const Client&);
338+
void operator=(const Client&);
339+
340+
EventLoop *loop;
341+
TcpClient *client;
342+
const char *ip;
343+
uint16_t port;
344+
int32_t nbuckets;
345+
std::shared_ptr<Table> table;
346+
std::map<std::string, std::string> files;
347+
Status state;
348+
349+
};
350+
351+
int main(int argc, char* argv[])
352+
{
353+
Client client("127.0.0.1", 8888, argc, argv);
354+
client.run();
355+
return 0;
356+
}

‎example/filesearch/filesort.cc

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
#include <unistd.h>
2+
#include <memory>
3+
#include <fstream>
4+
#include <iostream>
5+
#include <unordered_map>
6+
#include <map>
7+
#include <vector>
8+
#include <set>
9+
#include <assert.h>
10+
#include <algorithm>
11+
12+
int32_t index = 0;
13+
const size_t kMaxSize = 10 * 1000 * 1000;
14+
15+
class Sharder
16+
{
17+
public:
18+
explicit Sharder(int32_t nbuckets)
19+
{
20+
for (int32_t i = 0; i < nbuckets; ++i)
21+
{
22+
char buf[256];
23+
snprintf(buf, sizeof buf, "shard-%05d-of-%05d", i, nbuckets);
24+
std::shared_ptr<std::ofstream> in(new std::ofstream(buf));
25+
buckets.push_back(in);
26+
}
27+
assert(buckets.size() == static_cast<size_t>(nbuckets));
28+
}
29+
30+
void output(const std::string &key, const std::string &value)
31+
{
32+
size_t idx = std::hash<std::string>()(key) % buckets.size();
33+
*buckets[idx] << key << '\t' << value << '\n';
34+
}
35+
36+
protected:
37+
std::vector<std::shared_ptr<std::ofstream>> buckets;
38+
private:
39+
Sharder(const Sharder&);
40+
void operator=(const Sharder&);
41+
};
42+
43+
void shard(int32_t nbuckets, int32_t argc, char *argv[])
44+
{
45+
Sharder sharder(nbuckets);
46+
for (int32_t i = 1; i < argc; ++i)
47+
{
48+
std::cout << " processing input file " << argv[i] << std::endl;
49+
std::map<std::string, std::string> shareds;
50+
std::ifstream in(argv[i]);
51+
while (in && !in.eof())
52+
{
53+
shareds.clear();
54+
std::string str;
55+
while (in >> str)
56+
{
57+
shareds.insert(std::make_pair(str, str));
58+
if (shareds.size() > kMaxSize)
59+
{
60+
std::cout << " split" << std::endl;
61+
break;
62+
}
63+
}
64+
65+
for (const auto &kv : shareds)
66+
{
67+
sharder.output(kv.first, kv.second);
68+
}
69+
}
70+
}
71+
std::cout << "shuffling done" << std::endl;
72+
}
73+
74+
// ======= sortShareds =======
75+
76+
std::map<std::string, std::string> readShard(int32_t idx, int32_t nbuckets)
77+
{
78+
std::map<std::string, std::string> shareds;
79+
80+
char buf[256];
81+
snprintf(buf, sizeof buf, "shard-%05d-of-%05d", idx, nbuckets);
82+
std::cout << " reading " << buf << std::endl;
83+
{
84+
std::ifstream in(buf);
85+
std::string line;
86+
87+
while (getline(in, line))
88+
{
89+
size_t tab = line.find('\t');
90+
assert(tab != std::string::npos);
91+
std::string key(line.c_str(), line.c_str() + tab);
92+
std::string value(line.c_str() + tab + 1, line.c_str() + line.size());
93+
shareds.insert(std::make_pair(key, value));
94+
}
95+
}
96+
97+
::unlink(buf);
98+
return shareds;
99+
}
100+
101+
void sortShareds(const int32_t nbuckets)
102+
{
103+
for (int32_t i = 0; i < nbuckets; ++i)
104+
{
105+
// std::cout << " sorting " << std::endl;
106+
std::map<std::string, std::string> shareds;
107+
for (const auto &entry : readShard(i, nbuckets))
108+
{
109+
shareds.insert(std::make_pair(entry.first, entry.second));
110+
}
111+
112+
char buf[256];
113+
snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, nbuckets);
114+
std::ofstream out(buf);
115+
std::cout << " writing " << buf << std::endl;
116+
for (auto &it : shareds)
117+
{
118+
out << it.first << '\t' << it.second << '\n';
119+
}
120+
}
121+
std::cout << "reducing done" << std::endl;
122+
}
123+
124+
// ======= merge =======
125+
126+
class Source // copyable
127+
{
128+
public:
129+
explicit Source(const std::shared_ptr<std::ifstream> &in)
130+
: in(in)
131+
{
132+
133+
}
134+
135+
bool next()
136+
{
137+
std::string line;
138+
if (getline(*(in.get()), line))
139+
{
140+
size_t tab = line.find('\t');
141+
if (tab != std::string::npos)
142+
{
143+
std::string key(line.c_str(), line.c_str() + tab);
144+
this->key = key;
145+
std::string(line.c_str() + tab + 1, line.c_str() + line.size());
146+
this->value = value;
147+
return true;
148+
}
149+
}
150+
return false;
151+
}
152+
153+
bool operator<(const Source &rhs) const
154+
{
155+
return key < rhs.key;
156+
}
157+
158+
void retrieve()
159+
{
160+
index += key.size() + 3 + value.size() + sizeof(int32_t)
161+
}
162+
163+
void outputTo(std::ostream &out, int32_t index) const
164+
{
165+
out << key << '\t' << value << '\t' << std::to_string(index) << '\n';
166+
retrieve();
167+
}
168+
169+
std::shared_ptr<std::ifstream> in;
170+
std::string key;
171+
std::string value;
172+
};
173+
174+
void merge(const int32_t nbuckets)
175+
{
176+
std::vector<std::shared_ptr<std::ifstream>> inputs;
177+
std::vector<Source> keys;
178+
179+
for (int32_t i = 0; i < nbuckets; ++i)
180+
{
181+
char buf[256];
182+
snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, nbuckets);
183+
std::shared_ptr<std::ifstream> in(new std::ifstream(buf));
184+
inputs.push_back(in);
185+
Source rec(in);
186+
if (rec.next())
187+
{
188+
keys.push_back(rec);
189+
}
190+
::unlink(buf);
191+
}
192+
193+
std::ofstream out("output");
194+
std::make_heap(keys.begin(), keys.end());
195+
196+
while (!keys.empty())
197+
{
198+
std::pop_heap(keys.begin(), keys.end());
199+
keys.back().outputTo(out, index);
200+
201+
if (keys.back().next())
202+
{
203+
std::push_heap(keys.begin(), keys.end());
204+
}
205+
else
206+
{
207+
keys.pop_back();
208+
}
209+
}
210+
}
211+
212+
int32_t main(int32_t argc, char *argv[])
213+
{
214+
int32_t nbuckets = 10;
215+
shard(nbuckets, argc, argv);
216+
sortShareds(nbuckets);
217+
merge(nbuckets);
218+
}

‎example/filesearch/gen.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#!/usr/bin/python
2+
3+
import random
4+
5+
words = 100
6+
word_len = 10
7+
alphabet = 'ABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-'
8+
9+
output = open('random_words', 'w')
10+
for x in xrange(words):
11+
arr = [random.choice(alphabet) for i in range(word_len)]
12+
word = ''.join(arr)
13+
output.write(word)
14+
output.write('\n')

‎example/filesearch/md5.cc

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
#include "md5.h"
2+
3+
/* Define the static member of MD5. */
4+
const byte MD5::PADDING[64] = { 0x80 };
5+
const char MD5::HEX_NUMBERS[16] = {
6+
'0', '1', '2', '3',
7+
'4', '5', '6', '7',
8+
'8', '9', 'a', 'b',
9+
'c', 'd', 'e', 'f'
10+
};
11+
12+
MD5::MD5(const string& message) {
13+
finished = false;
14+
/* Reset number of bits. */
15+
count[0] = count[1] = 0;
16+
/* Initialization constants. */
17+
state[0] = 0x67452301;
18+
state[1] = 0xefcdab89;
19+
state[2] = 0x98badcfe;
20+
state[3] = 0x10325476;
21+
22+
/* Initialization the object according to message. */
23+
init((const byte*)message.c_str(), message.length());
24+
}
25+
26+
const byte* MD5::getDigest() {
27+
if (!finished) {
28+
finished = true;
29+
30+
byte bits[8];
31+
bit32 oldState[4];
32+
bit32 oldCount[2];
33+
bit32 index, padLen;
34+
35+
/* Save current state and count. */
36+
memcpy(oldState, state, 16);
37+
memcpy(oldCount, count, 8);
38+
39+
/* Save number of bits */
40+
encode(count, bits, 8);
41+
42+
/* Pad out to 56 mod 64. */
43+
index = (bit32)((count[0] >> 3) & 0x3f);
44+
padLen = (index < 56) ? (56 - index) : (120 - index);
45+
init(PADDING, padLen);
46+
47+
/* Append length (before padding) */
48+
init(bits, 8);
49+
50+
/* Store state in digest */
51+
encode(state, digest, 16);
52+
53+
/* Restore current state and count. */
54+
memcpy(state, oldState, 16);
55+
memcpy(count, oldCount, 8);
56+
}
57+
return digest;
58+
}
59+
60+
void MD5::init(const byte* input, size_t len) {
61+
62+
bit32 i, index, partLen;
63+
64+
finished = false;
65+
66+
/* Compute number of bytes mod 64 */
67+
index = (bit32)((count[0] >> 3) & 0x3f);
68+
69+
/* update number of bits */
70+
if ((count[0] += ((bit32)len << 3)) < ((bit32)len << 3)) {
71+
++count[1];
72+
}
73+
count[1] += ((bit32)len >> 29);
74+
75+
partLen = 64 - index;
76+
77+
/* transform as many times as possible. */
78+
if (len >= partLen) {
79+
80+
memcpy(&buffer[index], input, partLen);
81+
transform(buffer);
82+
83+
for (i = partLen; i + 63 < len; i += 64) {
84+
transform(&input[i]);
85+
}
86+
index = 0;
87+
88+
}
89+
else {
90+
i = 0;
91+
}
92+
93+
/* Buffer remaining input */
94+
memcpy(&buffer[index], &input[i], len - i);
95+
}
96+
97+
void MD5::transform(const byte block[64]) {
98+
99+
bit32 a = state[0], b = state[1], c = state[2], d = state[3], x[16];
100+
101+
decode(block, x, 64);
102+
103+
/* Round 1 */
104+
FF(a, b, c, d, x[0], s11, 0xd76aa478);
105+
FF(d, a, b, c, x[1], s12, 0xe8c7b756);
106+
FF(c, d, a, b, x[2], s13, 0x242070db);
107+
FF(b, c, d, a, x[3], s14, 0xc1bdceee);
108+
FF(a, b, c, d, x[4], s11, 0xf57c0faf);
109+
FF(d, a, b, c, x[5], s12, 0x4787c62a);
110+
FF(c, d, a, b, x[6], s13, 0xa8304613);
111+
FF(b, c, d, a, x[7], s14, 0xfd469501);
112+
FF(a, b, c, d, x[8], s11, 0x698098d8);
113+
FF(d, a, b, c, x[9], s12, 0x8b44f7af);
114+
FF(c, d, a, b, x[10], s13, 0xffff5bb1);
115+
FF(b, c, d, a, x[11], s14, 0x895cd7be);
116+
FF(a, b, c, d, x[12], s11, 0x6b901122);
117+
FF(d, a, b, c, x[13], s12, 0xfd987193);
118+
FF(c, d, a, b, x[14], s13, 0xa679438e);
119+
FF(b, c, d, a, x[15], s14, 0x49b40821);
120+
121+
/* Round 2 */
122+
GG(a, b, c, d, x[1], s21, 0xf61e2562);
123+
GG(d, a, b, c, x[6], s22, 0xc040b340);
124+
GG(c, d, a, b, x[11], s23, 0x265e5a51);
125+
GG(b, c, d, a, x[0], s24, 0xe9b6c7aa);
126+
GG(a, b, c, d, x[5], s21, 0xd62f105d);
127+
GG(d, a, b, c, x[10], s22, 0x2441453);
128+
GG(c, d, a, b, x[15], s23, 0xd8a1e681);
129+
GG(b, c, d, a, x[4], s24, 0xe7d3fbc8);
130+
GG(a, b, c, d, x[9], s21, 0x21e1cde6);
131+
GG(d, a, b, c, x[14], s22, 0xc33707d6);
132+
GG(c, d, a, b, x[3], s23, 0xf4d50d87);
133+
GG(b, c, d, a, x[8], s24, 0x455a14ed);
134+
GG(a, b, c, d, x[13], s21, 0xa9e3e905);
135+
GG(d, a, b, c, x[2], s22, 0xfcefa3f8);
136+
GG(c, d, a, b, x[7], s23, 0x676f02d9);
137+
GG(b, c, d, a, x[12], s24, 0x8d2a4c8a);
138+
139+
/* Round 3 */
140+
HH(a, b, c, d, x[5], s31, 0xfffa3942);
141+
HH(d, a, b, c, x[8], s32, 0x8771f681);
142+
HH(c, d, a, b, x[11], s33, 0x6d9d6122);
143+
HH(b, c, d, a, x[14], s34, 0xfde5380c);
144+
HH(a, b, c, d, x[1], s31, 0xa4beea44);
145+
HH(d, a, b, c, x[4], s32, 0x4bdecfa9);
146+
HH(c, d, a, b, x[7], s33, 0xf6bb4b60);
147+
HH(b, c, d, a, x[10], s34, 0xbebfbc70);
148+
HH(a, b, c, d, x[13], s31, 0x289b7ec6);
149+
HH(d, a, b, c, x[0], s32, 0xeaa127fa);
150+
HH(c, d, a, b, x[3], s33, 0xd4ef3085);
151+
HH(b, c, d, a, x[6], s34, 0x4881d05);
152+
HH(a, b, c, d, x[9], s31, 0xd9d4d039);
153+
HH(d, a, b, c, x[12], s32, 0xe6db99e5);
154+
HH(c, d, a, b, x[15], s33, 0x1fa27cf8);
155+
HH(b, c, d, a, x[2], s34, 0xc4ac5665);
156+
157+
/* Round 4 */
158+
II(a, b, c, d, x[0], s41, 0xf4292244);
159+
II(d, a, b, c, x[7], s42, 0x432aff97);
160+
II(c, d, a, b, x[14], s43, 0xab9423a7);
161+
II(b, c, d, a, x[5], s44, 0xfc93a039);
162+
II(a, b, c, d, x[12], s41, 0x655b59c3);
163+
II(d, a, b, c, x[3], s42, 0x8f0ccc92);
164+
II(c, d, a, b, x[10], s43, 0xffeff47d);
165+
II(b, c, d, a, x[1], s44, 0x85845dd1);
166+
II(a, b, c, d, x[8], s41, 0x6fa87e4f);
167+
II(d, a, b, c, x[15], s42, 0xfe2ce6e0);
168+
II(c, d, a, b, x[6], s43, 0xa3014314);
169+
II(b, c, d, a, x[13], s44, 0x4e0811a1);
170+
II(a, b, c, d, x[4], s41, 0xf7537e82);
171+
II(d, a, b, c, x[11], s42, 0xbd3af235);
172+
II(c, d, a, b, x[2], s43, 0x2ad7d2bb);
173+
II(b, c, d, a, x[9], s44, 0xeb86d391);
174+
175+
state[0] += a;
176+
state[1] += b;
177+
state[2] += c;
178+
state[3] += d;
179+
}
180+
181+
void MD5::encode(const bit32* input, byte* output, size_t length) {
182+
183+
for (size_t i = 0, j = 0; j < length; ++i, j += 4) {
184+
output[j] = (byte)(input[i] & 0xff);
185+
output[j + 1] = (byte)((input[i] >> 8) & 0xff);
186+
output[j + 2] = (byte)((input[i] >> 16) & 0xff);
187+
output[j + 3] = (byte)((input[i] >> 24) & 0xff);
188+
}
189+
}
190+
191+
void MD5::decode(const byte* input, bit32* output, size_t length) {
192+
for (size_t i = 0, j = 0; j < length; ++i, j += 4) {
193+
output[i] = ((bit32)input[j]) | (((bit32)input[j + 1]) << 8) |
194+
(((bit32)input[j + 2]) << 16) | (((bit32)input[j + 3]) << 24);
195+
}
196+
}
197+
198+
string MD5::toStr() {
199+
const byte* digest_ = getDigest();
200+
string str;
201+
str.reserve(16 << 1);
202+
for (size_t i = 0; i < 16; ++i) {
203+
int t = digest_[i];
204+
int a = t / 16;
205+
int b = t % 16;
206+
str.append(1, HEX_NUMBERS[a]);
207+
str.append(1, HEX_NUMBERS[b]);
208+
}
209+
return str;
210+
}

‎example/filesearch/md5.h

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
#define s11 7
2+
#define s12 12
3+
#define s13 17
4+
#define s14 22
5+
#define s21 5
6+
#define s22 9
7+
#define s23 14
8+
#define s24 20
9+
#define s31 4
10+
#define s32 11
11+
#define s33 16
12+
#define s34 23
13+
#define s41 6
14+
#define s42 10
15+
#define s43 15
16+
#define s44 21
17+
18+
#define F(x, y, z) (((x) & (y)) | ((~x) & (z)))
19+
#define G(x, y, z) (((x) & (z)) | ((y) & (~z)))
20+
#define H(x, y, z) ((x) ^ (y) ^ (z))
21+
#define I(x, y, z) ((y) ^ ((x) | (~z)))
22+
23+
#define ROTATELEFT(num, n) (((num) << (n)) | ((num) >> (32-(n))))
24+
25+
#define FF(a, b, c, d, x, s, ac) { \
26+
(a) += F ((b), (c), (d)) + (x) + ac; \
27+
(a) = ROTATELEFT ((a), (s)); \
28+
(a) += (b); \
29+
}
30+
#define GG(a, b, c, d, x, s, ac) { \
31+
(a) += G ((b), (c), (d)) + (x) + ac; \
32+
(a) = ROTATELEFT ((a), (s)); \
33+
(a) += (b); \
34+
}
35+
#define HH(a, b, c, d, x, s, ac) { \
36+
(a) += H ((b), (c), (d)) + (x) + ac; \
37+
(a) = ROTATELEFT ((a), (s)); \
38+
(a) += (b); \
39+
}
40+
#define II(a, b, c, d, x, s, ac) { \
41+
(a) += I ((b), (c), (d)) + (x) + ac; \
42+
(a) = ROTATELEFT ((a), (s)); \
43+
(a) += (b); \
44+
}
45+
46+
#include <string>
47+
#include <cstring>
48+
49+
using std::string;
50+
51+
/* Define of btye.*/
52+
typedef unsigned char byte;
53+
/* Define of byte. */
54+
typedef unsigned int bit32;
55+
56+
class MD5 {
57+
public:
58+
/* Construct a MD5 object with a string. */
59+
MD5(const string& message);
60+
61+
/* Generate md5 digest. */
62+
const byte* getDigest();
63+
64+
/* Convert digest to string value */
65+
string toStr();
66+
67+
private:
68+
/* Initialization the md5 object, processing another message block,
69+
* and updating the context.*/
70+
void init(const byte* input, size_t len);
71+
72+
/* MD5 basic transformation. Transforms state based on block. */
73+
void transform(const byte block[64]);
74+
75+
/* Encodes input (usigned long) into output (byte). */
76+
void encode(const bit32* input, byte* output, size_t length);
77+
78+
/* Decodes input (byte) into output (usigned long). */
79+
void decode(const byte* input, bit32* output, size_t length);
80+
81+
private:
82+
/* Flag for mark whether calculate finished. */
83+
bool finished;
84+
85+
/* state (ABCD). */
86+
bit32 state[4];
87+
88+
/* number of bits, low-order word first. */
89+
bit32 count[2];
90+
91+
/* input buffer. */
92+
byte buffer[64];
93+
94+
/* message digest. */
95+
byte digest[16];
96+
97+
/* padding for calculate. */
98+
static const byte PADDING[64];
99+
100+
/* Hex numbers. */
101+
static const char HEX_NUMBERS[16];
102+
};
103+

‎example/filesearch/server.cc

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
#include "all.h"
2+
#include "tcpconnection.h"
3+
#include "tcpserver.h"
4+
#include "eventloop.h"
5+
#include "log.h"
6+
#include "md5.h"
7+
namespace fs = std::experimental::filesystem;
8+
9+
class Server
10+
{
11+
public:
12+
Server(const char *ip, uint16_t port)
13+
:ip(ip),
14+
port(port),
15+
path("filedata/")
16+
{
17+
EventLoop loop;
18+
this->loop = &loop;
19+
TcpServer server(&loop, ip, port, nullptr);
20+
server.setConnectionCallback(std::bind(&Server::connectionCallback, this, std::placeholders::_1));
21+
server.setMessageCallback(std::bind(&Server::messageCallback, this, std::placeholders::_1, std::placeholders::_2));
22+
server.start();
23+
}
24+
25+
void readAllFile(const TcpConnectionPtr &conn)
26+
{
27+
std::string fileName;
28+
for (auto &iter : fs::directory_iterator(path))
29+
{
30+
auto fe = iter.path();
31+
32+
fileName.clear();
33+
fileName += path;
34+
fileName += fe.filename().string();
35+
36+
if (!fs::exists(fileName))
37+
{
38+
exit(1);
39+
}
40+
41+
std::ifstream fin(fileName.c_str());
42+
if (fin.eof())
43+
{
44+
continue;
45+
}
46+
47+
int i = 0;
48+
std::string line;
49+
while(std::getline(fin,line, '\n'))
50+
{
51+
Buffer *buffer = conn->outputBuffer();
52+
buffer->append(MD5(line).toStr());
53+
buffer->append("\r\n", 2);
54+
conn->sendPipe();
55+
}
56+
fin.close();
57+
}
58+
}
59+
60+
void writeCompleteCallback(const TcpConnectionPtr &conn)
61+
{
62+
conn->startRead();
63+
conn->setWriteCompleteCallback(WriteCompleteCallback());
64+
}
65+
66+
void highWaterCallback(const TcpConnectionPtr &conn, size_t bytesToSent)
67+
{
68+
LOG_INFO << " bytes " << bytesToSent;
69+
if (conn->outputBuffer()->readableBytes() > 0)
70+
{
71+
conn->stopRead();
72+
conn->setWriteCompleteCallback(
73+
std::bind(&Server::writeCompleteCallback, this, std::placeholders::_1));
74+
}
75+
}
76+
77+
void connectionCallback(const TcpConnectionPtr &conn)
78+
{
79+
conn->setHighWaterMarkCallback(
80+
std::bind(&Server::highWaterCallback, this, std::placeholders::_1, std::placeholders::_2),
81+
1024 * 1024);
82+
readAllFile(conn);
83+
}
84+
85+
void messageCallback(const TcpConnectionPtr &conn, Buffer *buffer)
86+
{
87+
buffer->retrieveAll();
88+
}
89+
90+
void readFileData(const TcpConnectionPtr &conn)
91+
{
92+
conn->shutdown();
93+
}
94+
95+
void run()
96+
{
97+
loop->run();
98+
}
99+
private:
100+
Server(const Server&);
101+
void operator=(const Server&);
102+
103+
EventLoop *loop;
104+
const char *ip;
105+
uint16_t port;
106+
const char *path;
107+
};
108+
109+
int main(int argc, char* argv[])
110+
{
111+
Server server("127.0.0.1", 8888);
112+
server.run();
113+
return 0;
114+
}

‎example/http/httptest.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,7 @@ void onMessage(const HttpRequest &req, HttpResponse *resp)
5050
}
5151
else if (req.getPath() == "/test.txt")
5252
{
53-
std::string filename = "attachment;filename=";
54-
filename += "test.txt";
53+
std::string filename = "attachment;filename=test.txt";
5554
resp->setStatusCode(HttpResponse::k200k);
5655
resp->setStatusMessage("OK");
5756
resp->setContentType("text/plain");

‎example/leveldb/.DS_Store

8 KB
Binary file not shown.

‎example/leveldb/example/cachetest.cc

Lines changed: 108 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -1,108 +1,108 @@
1-
#include "dbimpl.h"
2-
#include "option.h"
3-
#include "cache.h"
4-
#include "coding.h"
5-
#include <vector>
6-
#include <functional>
7-
8-
// Conversions between numeric keys/values and the types expected by Cache.
9-
static std::string encodeKey(int k)
10-
{
11-
std::string result;
12-
putFixed32(&result, k);
13-
return result;
14-
}
15-
16-
static int decodeKey(const std::string_view &k)
17-
{
18-
assert(k.size() == 4);
19-
return decodeFixed32(k.data());
20-
}
21-
22-
static int decodeValue(const std::any &v)
23-
{
24-
assert(v.has_value());
25-
return std::any_cast<int>(v);
26-
}
27-
28-
class CacheTest
29-
{
30-
public:
31-
void deleterCallBack(const std::string_view &key, const std::any &v)
32-
{
33-
deletedKeys.push_back(decodeKey(key));
34-
deletedValues.push_back(decodeValue(v));
35-
}
36-
37-
CacheTest()
38-
:cache(new ShardedLRUCache(kCacheSize))
39-
{
40-
41-
}
42-
43-
int lookup(int key)
44-
{
45-
auto handle = cache->lookup(encodeKey(key));
46-
const int r = (handle == nullptr) ? -1 : decodeValue(cache->value(handle));
47-
if (handle != nullptr)
48-
{
49-
cache->erase(encodeKey(key));
50-
}
51-
return r;
52-
}
53-
54-
void insert(int key, int value, int charge = 1)
55-
{
56-
cache->insert(encodeKey(key), value, charge,
57-
std::bind(&CacheTest::deleterCallBack, this,
58-
std::placeholders::_1, std::placeholders::_2));
59-
}
60-
61-
void erase(int key)
62-
{
63-
cache->erase(encodeKey(key));
64-
}
65-
66-
void hitAndMiss()
67-
{
68-
assert(lookup(100) == -1);
69-
insert(100, 101);
70-
assert(lookup(100) == 101);
71-
assert(lookup(200) == -1);
72-
assert(lookup(300) == -1);
73-
74-
insert(200, 201);
75-
assert(lookup(100) == 101);
76-
assert(lookup(200) == 201);
77-
assert(lookup(300) == -1);
78-
79-
insert(100, 102);
80-
assert(lookup(100) == 102);
81-
assert(lookup(200) == 201);
82-
assert(lookup(300) == -1);
83-
84-
assert(deletedKeys.size() == 1);
85-
assert(deletedKeys[0] == 100);
86-
assert(deletedValues[0] == 101);
87-
}
88-
89-
void erase()
90-
{
91-
92-
}
93-
94-
private:
95-
static const int kCacheSize = 1000;
96-
std::vector<int> deletedKeys;
97-
std::vector<int> deletedValues;
98-
std::shared_ptr<ShardedLRUCache> cache;
99-
};
100-
101-
int main(int argc, char *argv[])
102-
{
103-
CacheTest test;
104-
test.hitAndMiss();
105-
test.erase();
106-
return 0;
107-
108-
}
1+
#include "dbimpl.h"
2+
#include "option.h"
3+
#include "cache.h"
4+
#include "coding.h"
5+
#include <vector>
6+
#include <functional>
7+
8+
// Conversions between numeric keys/values and the types expected by Cache.
9+
static std::string encodeKey(int k)
10+
{
11+
std::string result;
12+
putFixed32(&result, k);
13+
return result;
14+
}
15+
16+
static int decodeKey(const std::string_view &k)
17+
{
18+
assert(k.size() == 4);
19+
return decodeFixed32(k.data());
20+
}
21+
22+
static int decodeValue(const std::any &v)
23+
{
24+
assert(v.has_value());
25+
return std::any_cast<int>(v);
26+
}
27+
28+
class CacheTest
29+
{
30+
public:
31+
void deleterCallBack(const std::string_view &key, const std::any &v)
32+
{
33+
deletedKeys.push_back(decodeKey(key));
34+
deletedValues.push_back(decodeValue(v));
35+
}
36+
37+
CacheTest()
38+
:cache(new ShardedLRUCache(kCacheSize))
39+
{
40+
41+
}
42+
43+
int lookup(int key)
44+
{
45+
auto handle = cache->lookup(encodeKey(key));
46+
const int r = (handle == nullptr) ? -1 : decodeValue(cache->value(handle));
47+
if (handle != nullptr)
48+
{
49+
cache->erase(encodeKey(key));
50+
}
51+
return r;
52+
}
53+
54+
void insert(int key, int value, int charge = 1)
55+
{
56+
cache->insert(encodeKey(key), value, charge,
57+
std::bind(&CacheTest::deleterCallBack, this,
58+
std::placeholders::_1, std::placeholders::_2));
59+
}
60+
61+
void erase(int key)
62+
{
63+
cache->erase(encodeKey(key));
64+
}
65+
66+
void hitAndMiss()
67+
{
68+
assert(lookup(100) == -1);
69+
insert(100, 101);
70+
assert(lookup(100) == 101);
71+
assert(lookup(200) == -1);
72+
assert(lookup(300) == -1);
73+
74+
insert(200, 201);
75+
assert(lookup(100) == 101);
76+
assert(lookup(200) == 201);
77+
assert(lookup(300) == -1);
78+
79+
insert(100, 102);
80+
assert(lookup(100) == 102);
81+
assert(lookup(200) == 201);
82+
assert(lookup(300) == -1);
83+
84+
assert(deletedKeys.size() == 1);
85+
assert(deletedKeys[0] == 100);
86+
assert(deletedValues[0] == 101);
87+
}
88+
89+
void erase()
90+
{
91+
92+
}
93+
94+
private:
95+
static const int kCacheSize = 1000;
96+
std::vector<int> deletedKeys;
97+
std::vector<int> deletedValues;
98+
std::shared_ptr<ShardedLRUCache> cache;
99+
};
100+
101+
int main(int argc, char *argv[])
102+
{
103+
CacheTest test;
104+
test.hitAndMiss();
105+
test.erase();
106+
return 0;
107+
108+
}

‎example/leveldb/example/tabletest.cc

Lines changed: 749 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
#pragma once
2+
#include "dbimpl.h"
3+

‎example/leveldb/src/dbformat.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,13 @@ class BytewiseComparatorImpl : public Comparator
134134
{
135135

136136
}
137+
137138
virtual const char *name() const { return "leveldb.BytewiseComparator"; }
138139
virtual int compare(const std::string_view &a, const std::string_view &b) const
139140
{
140141
return a.compare(b);
141142
}
143+
142144
virtual void findShortestSeparator(std::string *start, const std::string_view &limit) const;
143145
virtual void findShortSuccessor(std::string *key) const;
144146
};

‎example/leveldb/src/dbimpl.cc

Lines changed: 175 additions & 82 deletions
Large diffs are not rendered by default.

‎example/leveldb/src/dbimpl.h

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,16 @@
1313
#include "posix.h"
1414
#include "tablecache.h"
1515

16+
// A range of keys
17+
struct Range
18+
{
19+
std::string_view start; // Included in the range
20+
std::string_view limit; // Not included in the range
21+
22+
Range() { }
23+
Range(const std::string_view &s, const std::string_view &l) : start(s), limit(l) { }
24+
};
25+
1626
class DBImpl
1727
{
1828
public:
@@ -21,12 +31,32 @@ class DBImpl
2131

2232
Status open();
2333
Status newDB();
34+
// Set the database entry for "key" to "value". Returns OK on success,
35+
// and a non-OK status on error.
36+
// Note: consider setting options.sync = true.
2437
Status put(const WriteOptions&, const std::string_view &key, const std::string_view &value);
38+
// Remove the database entry (if any) for "key". Returns OK on
39+
// success, and a non-OK status on error. It is not an error if "key"
40+
// did not exist in the database.
41+
// Note: consider setting options.sync = true.
2542
Status del(const WriteOptions&, const std::string_view &key);
43+
// Apply the specified updates to the database.
44+
// Returns OK on success, non-OK on failure.
45+
// Note: consider setting options.sync = true.
2646
Status write(const WriteOptions &options, WriteBatch *updates);
47+
// If the database contains an entry for "key" store the
48+
// corresponding value in *value and return OK.
49+
//
50+
// If there is no entry for "key" leave *value unchanged and return
51+
// a status for which Status::IsNotFound() returns true.
52+
//
53+
// May return some other Status on an error.
2754
Status get(const ReadOptions &options, const std::string_view &key, std::string *value);
2855
Status destroyDB(const std::string &dbname, const Options &options);
2956
void deleteObsoleteFiles();
57+
// Recover the descriptor from persistent storage. May do a significant
58+
// amount of work to recover recently logged updates. Any changes to
59+
// be made to the descriptor are added to *edit.
3060
Status recover(VersionEdit *edit, bool *saveManifest);
3161
Status recoverLogFile(uint64_t logNumber, bool lastLog,
3262
bool *saveManifest, VersionEdit *edit, uint64_t *maxSequence);
@@ -37,10 +67,42 @@ class DBImpl
3767
void compactMemTable();
3868
std::shared_ptr<Iterator> newInternalIterator(const ReadOptions &options,
3969
uint64_t *latestSnapshot, uint32_t *seed);
70+
// Return a heap-allocated iterator over the contents of the database.
71+
// The result of NewIterator() is initially invalid (caller must
72+
// call one of the Seek methods on the iterator before using it).
73+
//
74+
// Caller should delete the iterator when it is no longer needed.
75+
// The returned iterator should be deleted before this db is deleted.
4076
std::shared_ptr<Iterator> newIterator(const ReadOptions &options);
4177
Options sanitizeOptions(const std::string &dbname,
4278
const InternalKeyComparator *icmp,
4379
const Options &src);
80+
// DB implementations can export properties about their state
81+
// via this method. If "property" is a valid property understood by this
82+
// DB implementation, fills "*value" with its current value and returns
83+
// true. Otherwise returns false.
84+
//
85+
//
86+
// Valid property names include:
87+
//
88+
// "leveldb.num-files-at-level<N>" - return the number of files at level <N>,
89+
// where <N> is an ASCII representation of a level number (e.g. "0").
90+
// "leveldb.stats" - returns a multi-line string that describes statistics
91+
// about the internal operation of the DB.
92+
// "leveldb.sstables" - returns a multi-line string that describes all
93+
// of the sstables that make up the db contents.
94+
// "leveldb.approximate-memory-usage" - returns the approximate number of
95+
// bytes of memory in use by the DB.
96+
bool getProperty(const std::string_view &property, std::string *value);
97+
// For each i in [0,n-1], store in "sizes[i]", the approximate
98+
// file system space used by keys in "[range[i].start .. range[i].limit)".
99+
//
100+
// Note that the returned sizes measure file system space usage, so
101+
// if the user data compresses by a factor of ten, the returned
102+
// sizes will be one-tenth the size of the corresponding user data size.
103+
//
104+
// The results may not include the sizes of recently written data.
105+
void getApproximateSizes(const Range *range, int n, uint64_t* sizes);
44106

45107
// Force current memtable contents to be compacted.
46108
Status testCompactMemTable();

‎example/leveldb/src/dbtest.cc

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
#include <stdint.h>
2+
#include <stdio.h>
3+
4+
#include <algorithm>
5+
#include <set>
6+
#include <string>
7+
#include <vector>
8+
#include <assert.h>
9+
#include <random>
10+
11+
#include "dbimpl.h"
12+
#include "filename.h"
13+
#include "logwriter.h"
14+
#include "logreader.h"
15+
#include "tablecache.h"
16+
#include "versionset.h"
17+
#include "status.h"
18+
#include "table.h"
19+
#include "tablebuilder.h"
20+
#include "block.h"
21+
#include "merger.h"
22+
#include "option.h"
23+
24+
const int kNumNonTableCacheFiles = 10;
25+
26+
std::default_random_engine e;
27+
28+
std::string makeKey(unsigned int num)
29+
{
30+
char buf[30];
31+
snprintf(buf, sizeof(buf), "%016u", num);
32+
return std::string(buf);
33+
}
34+
35+
class DBTest
36+
{
37+
public:
38+
std::string dbname;
39+
DBImpl db;
40+
};
41+
42+
void bmLogAndApply(int iters, int numbasefiles)
43+
{
44+
Options opts;
45+
opts.createIfMissing = true;
46+
std::string dbname = "./leveldb_test_benchmark";
47+
DBImpl db(opts, dbname);
48+
db.destroyDB(dbname, opts);
49+
50+
Status s = db.open();
51+
assert(s.ok());
52+
53+
BytewiseComparatorImpl byteImpl;
54+
InternalKeyComparator cmp(&byteImpl);
55+
Options options;
56+
VersionSet vset(dbname, options, nullptr, &cmp);
57+
bool manifest;
58+
assert(vset.recover(&manifest).ok());
59+
60+
VersionEdit vbase;
61+
uint64_t fnum = 1;
62+
for (int i = 0; i < numbasefiles; i++)
63+
{
64+
InternalKey start(makeKey(2 * fnum), 1, kTypeValue);
65+
InternalKey limit(makeKey(2 * fnum + 1), 1, kTypeDeletion);
66+
vbase.addFile(2, fnum++, 1 /* file size */, start, limit);
67+
}
68+
69+
assert(vset.logAndApply(&vbase).ok());
70+
uint64_t startMicros = options.env->nowMicros();
71+
72+
for (int i = 0; i < iters; i++)
73+
{
74+
VersionEdit vedit;
75+
vedit.deleteFile(2, fnum);
76+
InternalKey start(makeKey(2 * fnum), 1, kTypeValue);
77+
InternalKey limit(makeKey(2 * fnum + 1), 1, kTypeDeletion);
78+
vedit.addFile(2, fnum++, 1 /* file size */, start, limit);
79+
vset.logAndApply(&vedit);
80+
}
81+
82+
uint64_t stopMicros = options.env->nowMicros();
83+
unsigned int us = stopMicros - startMicros;
84+
char buf[16];
85+
snprintf(buf, sizeof(buf), "%d", numbasefiles);
86+
fprintf(stderr,
87+
"BM_LogAndApply/%-6s %8d iters : %9u us (%7.0f us / iter)\n",
88+
buf, iters, us, ((float)us) / iters);
89+
}
90+
91+
int main()
92+
{
93+
bmLogAndApply(1000, 1);
94+
bmLogAndApply(1000, 100);
95+
bmLogAndApply(1000, 10000);
96+
bmLogAndApply(100, 100000);
97+
return 0;
98+
}

‎example/leveldb/src/logging.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,7 @@ Status readFileToString(PosixEnv *env, const std::string &fname, std::string *da
2626

2727
Status writeStringToFileSync(PosixEnv *env, const std::string &data, const std::string &fname);
2828

29+
// Return a human-readable version of "value".
30+
// Escapes any non-printable characters found in "value".
2931
std::string escapeString(const std::string &value);
32+

‎example/leveldb/src/option.cc

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#include "option.h"
2+
3+
// Create an Options object with default values for all fields.
4+
5+
static BytewiseComparatorImpl byteImpl;
6+
Options::Options()
7+
:comparator(&byteImpl),
8+
createIfMissing(false),
9+
errorIfExists(false),
10+
paranoidChecks(false),
11+
env(new PosixEnv()),
12+
writeBufferSize(4 << 20),
13+
maxOpenFiles(1000),
14+
blockSize(4096),
15+
blockRestartInterval(16),
16+
maxFileSize(2 << 20),
17+
compression(kNoCompression),
18+
reuseLogs(false)
19+
{
20+
21+
}

‎example/leveldb/src/posix.cc

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ Status PosixWritableFile::syncDirIfManifest()
9393
}
9494
else
9595
{
96-
if (fsync(fd) < 0)
96+
if (::fsync(fd) < 0)
9797
{
9898
s = posixError(dir, errno);
9999
}
@@ -112,14 +112,19 @@ Status PosixWritableFile::sync()
112112
return s;
113113
}
114114

115+
#ifdef __APPLE__
115116
s = flushBuffered();
116-
if (s.ok())
117+
if (s.ok() && ::fsync(fd) != 0)
117118
{
118-
if (::fsync(fd) != 0)
119-
{
120-
s = posixError(filename, errno);
121-
}
119+
s = posixError(filename, errno);
120+
}
121+
#else
122+
s = flushBuffered();
123+
if (s.ok() && ::fdatasync(fd) != 0)
124+
{
125+
s = posixError(filename, errno);
122126
}
127+
#endif
123128
return s;
124129
}
125130

‎example/leveldb/src/tablecache.cc

Lines changed: 104 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -1,104 +1,104 @@
1-
#include "tablecache.h"
2-
#include "filename.h"
3-
#include "coding.h"
4-
#include "table.h"
5-
6-
struct TableAndFile
7-
{
8-
std::shared_ptr<RandomAccessFile> file;
9-
std::shared_ptr<Table> table;
10-
};
11-
12-
static void deleteEntry(const std::string_view &key, std::any &value)
13-
{
14-
15-
}
16-
17-
TableCache::TableCache(const std::string &dbname, const Options &options, int entries)
18-
:options(options),
19-
dbname(dbname),
20-
cache(new ShardedLRUCache(entries))
21-
{
22-
23-
}
24-
25-
TableCache::~TableCache()
26-
{
27-
28-
}
29-
30-
Status TableCache::findTable(uint64_t fileNumber, uint64_t fileSize,
31-
std::shared_ptr<LRUHandle> &handle)
32-
{
33-
Status s;
34-
char buf[sizeof(fileNumber)];
35-
encodeFixed64(buf, fileNumber);
36-
std::string_view key(buf, sizeof(buf));
37-
handle = cache->lookup(key);
38-
if (handle == nullptr)
39-
{
40-
std::string fname = tableFileName(dbname, fileNumber);
41-
std::shared_ptr<RandomAccessFile> file = nullptr;
42-
std::shared_ptr<Table> table = nullptr;
43-
s = options.env->newRandomAccessFile(fname, file);
44-
if (s.ok())
45-
{
46-
s = Table::open(options, file, fileSize, table);
47-
}
48-
49-
if (!s.ok())
50-
{
51-
assert(table == nullptr);
52-
// We do not cache error results so that if the error is transient,
53-
// or somebody repairs the file, we recover automatically.
54-
}
55-
else
56-
{
57-
std::shared_ptr<TableAndFile> tf(new TableAndFile);
58-
tf->file = file;
59-
tf->table = table;
60-
handle = cache->insert(key, tf, 1, nullptr);
61-
printf("Table cache is open %s\n", fname.c_str());
62-
}
63-
}
64-
return s;
65-
}
66-
67-
std::shared_ptr<Iterator> TableCache::newIterator(const ReadOptions &options,
68-
uint64_t fileNumber,
69-
uint64_t fileSize,
70-
std::shared_ptr<Table> tableptr)
71-
{
72-
std::shared_ptr<LRUHandle> handle;
73-
Status s = findTable(fileNumber, fileSize, handle);
74-
if (!s.ok())
75-
{
76-
return newErrorIterator(s);
77-
}
78-
79-
std::shared_ptr<Table> table = std::any_cast<std::shared_ptr<TableAndFile>>(handle->value)->table;
80-
std::shared_ptr<Iterator> result = table->newIterator(options);
81-
result->registerCleanup(handle);
82-
tableptr = table;
83-
return result;
84-
}
85-
86-
Status TableCache::get(const ReadOptions &options,
87-
uint64_t fileNumber,
88-
uint64_t fileSize,
89-
const std::string_view &k,
90-
const std::any &arg,
91-
std::function<void( const std::any &,
92-
const std::string_view &, const std::string_view &)> &&callback)
93-
{
94-
std::shared_ptr<LRUHandle> handle;
95-
Status s = findTable(fileNumber, fileSize, handle);
96-
if (s.ok())
97-
{
98-
//printf("Table cache get file number :%d bytes %lld\n", fileNumber, fileSize);
99-
std::shared_ptr<Table> table = std::any_cast<std::shared_ptr<TableAndFile>>(handle->value)->table;
100-
s = table->internalGet(options, k, arg, callback);
101-
}
102-
103-
return s;
104-
}
1+
#include "tablecache.h"
2+
#include "filename.h"
3+
#include "coding.h"
4+
#include "table.h"
5+
6+
struct TableAndFile
7+
{
8+
std::shared_ptr<RandomAccessFile> file;
9+
std::shared_ptr<Table> table;
10+
};
11+
12+
static void deleteEntry(const std::string_view &key, std::any &value)
13+
{
14+
15+
}
16+
17+
TableCache::TableCache(const std::string &dbname, const Options &options, int entries)
18+
:options(options),
19+
dbname(dbname),
20+
cache(new ShardedLRUCache(entries))
21+
{
22+
23+
}
24+
25+
TableCache::~TableCache()
26+
{
27+
28+
}
29+
30+
Status TableCache::findTable(uint64_t fileNumber, uint64_t fileSize,
31+
std::shared_ptr<LRUHandle> &handle)
32+
{
33+
Status s;
34+
char buf[sizeof(fileNumber)];
35+
encodeFixed64(buf, fileNumber);
36+
std::string_view key(buf, sizeof(buf));
37+
handle = cache->lookup(key);
38+
if (handle == nullptr)
39+
{
40+
std::string fname = tableFileName(dbname, fileNumber);
41+
std::shared_ptr<RandomAccessFile> file = nullptr;
42+
std::shared_ptr<Table> table = nullptr;
43+
s = options.env->newRandomAccessFile(fname, file);
44+
if (s.ok())
45+
{
46+
s = Table::open(options, file, fileSize, table);
47+
}
48+
49+
if (!s.ok())
50+
{
51+
assert(table == nullptr);
52+
// We do not cache error results so that if the error is transient,
53+
// or somebody repairs the file, we recover automatically.
54+
}
55+
else
56+
{
57+
std::shared_ptr<TableAndFile> tf(new TableAndFile);
58+
tf->file = file;
59+
tf->table = table;
60+
handle = cache->insert(key, tf, 1, nullptr);
61+
printf("Table cache is open %s\n", fname.c_str());
62+
}
63+
}
64+
return s;
65+
}
66+
67+
std::shared_ptr<Iterator> TableCache::newIterator(const ReadOptions &options,
68+
uint64_t fileNumber,
69+
uint64_t fileSize,
70+
std::shared_ptr<Table> tableptr)
71+
{
72+
std::shared_ptr<LRUHandle> handle;
73+
Status s = findTable(fileNumber, fileSize, handle);
74+
if (!s.ok())
75+
{
76+
return newErrorIterator(s);
77+
}
78+
79+
std::shared_ptr<Table> table = std::any_cast<std::shared_ptr<TableAndFile>>(handle->value)->table;
80+
std::shared_ptr<Iterator> result = table->newIterator(options);
81+
result->registerCleanup(handle);
82+
tableptr = table;
83+
return result;
84+
}
85+
86+
Status TableCache::get(const ReadOptions &options,
87+
uint64_t fileNumber,
88+
uint64_t fileSize,
89+
const std::string_view &k,
90+
const std::any &arg,
91+
std::function<void( const std::any &,
92+
const std::string_view &, const std::string_view &)> &&callback)
93+
{
94+
std::shared_ptr<LRUHandle> handle;
95+
Status s = findTable(fileNumber, fileSize, handle);
96+
if (s.ok())
97+
{
98+
//printf("Table cache get file number :%d bytes %lld\n", fileNumber, fileSize);
99+
std::shared_ptr<Table> table = std::any_cast<std::shared_ptr<TableAndFile>>(handle->value)->table;
100+
s = table->internalGet(options, k, arg, callback);
101+
}
102+
103+
return s;
104+
}

‎example/leveldb/src/tablecache.h

Lines changed: 51 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,51 @@
1-
#pragma once
2-
#include <string>
3-
#include <stdint.h>
4-
#include "dbformat.h"
5-
#include "cache.h"
6-
#include "table.h"
7-
#include "option.h"
8-
9-
//, std::bind(&Version::saveValue, this,
10-
// std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)
11-
//std::function<void(const std::any &arg,
12-
// const std::string_view &k, const std::any &value)> &&callback
13-
class TableCache
14-
{
15-
public:
16-
TableCache(const std::string &dbname, const Options &options, int entries);
17-
~TableCache();
18-
19-
// Return an iterator for the specified file number (the corresponding
20-
// file length must be exactly "file_size" bytes). If "tableptr" is
21-
// non-null, also sets "*tableptr" to point to the Table object
22-
// underlying the returned iterator, or to nullptr if no Table object
23-
// underlies the returned iterator. The returned "*tableptr" object is owned
24-
// by the cache and should not be deleted, and is valid for as long as the
25-
// returned iterator is live.
26-
std::shared_ptr<Iterator> newIterator(const ReadOptions &options,
27-
uint64_t fileNumber,
28-
uint64_t fileSize,
29-
std::shared_ptr<Table> tableptr = nullptr);
30-
31-
// If a seek to internal key "k" in specified file finds an entry,
32-
// call (*handle_result)(arg, found_key, found_value).
33-
Status get(const ReadOptions &options,
34-
uint64_t fileNumber,
35-
uint64_t fileSize,
36-
const std::string_view &k,
37-
const std::any &arg,
38-
std::function<void( const std::any &,
39-
const std::string_view &, const std::string_view &)> &&callback);
40-
Status findTable(uint64_t fileNumber, uint64_t fileSize,
41-
std::shared_ptr<LRUHandle> &handle);
42-
private:
43-
TableCache(const TableCache&) = delete;
44-
void operator=(const TableCache&) = delete;
45-
46-
std::string dbname;
47-
const Options options;
48-
std::shared_ptr<ShardedLRUCache> cache;
49-
};
1+
#pragma once
2+
#include <string>
3+
#include <stdint.h>
4+
#include "dbformat.h"
5+
#include "cache.h"
6+
#include "table.h"
7+
#include "option.h"
8+
9+
//, std::bind(&Version::saveValue, this,
10+
// std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)
11+
//std::function<void(const std::any &arg,
12+
// const std::string_view &k, const std::any &value)> &&callback
13+
class TableCache
14+
{
15+
public:
16+
TableCache(const std::string &dbname, const Options &options, int entries);
17+
~TableCache();
18+
19+
// Return an iterator for the specified file number (the corresponding
20+
// file length must be exactly "file_size" bytes). If "tableptr" is
21+
// non-null, also sets "*tableptr" to point to the Table object
22+
// underlying the returned iterator, or to nullptr if no Table object
23+
// underlies the returned iterator. The returned "*tableptr" object is owned
24+
// by the cache and should not be deleted, and is valid for as long as the
25+
// returned iterator is live.
26+
std::shared_ptr<Iterator> newIterator(const ReadOptions &options,
27+
uint64_t fileNumber,
28+
uint64_t fileSize,
29+
std::shared_ptr<Table> tableptr = nullptr);
30+
31+
// If a seek to internal key "k" in specified file finds an entry,
32+
// call (*handle_result)(arg, found_key, found_value).
33+
Status get(const ReadOptions &options,
34+
uint64_t fileNumber,
35+
uint64_t fileSize,
36+
const std::string_view &k,
37+
const std::any &arg,
38+
std::function<void( const std::any &,
39+
const std::string_view &, const std::string_view &)> &&callback);
40+
Status findTable(uint64_t fileNumber, uint64_t fileSize,
41+
std::shared_ptr<LRUHandle> &handle);
42+
std::shared_ptr<ShardedLRUCache> getCache() { return cache; }
43+
44+
private:
45+
TableCache(const TableCache&) = delete;
46+
void operator=(const TableCache&) = delete;
47+
48+
std::string dbname;
49+
const Options options;
50+
std::shared_ptr<ShardedLRUCache> cache;
51+
};

‎example/leveldb/src/versionset.cc

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,34 @@ void Version::getOverlappingInputs(
511511
}
512512
}
513513

514+
std::string Version::debugString() const
515+
{
516+
std::string r;
517+
for (int level = 0; level < kNumLevels; level++)
518+
{
519+
// E.g.,
520+
// --- level 1 ---
521+
// 17:123['a' .. 'd']
522+
// 20:43['e' .. 'g']
523+
r.append("--- level ");
524+
appendNumberTo(&r, level);
525+
r.append(" ---\n");
526+
const std::vector<std::shared_ptr<FileMetaData>> &f = files[level];
527+
for (size_t i = 0; i < f.size(); i++) {
528+
r.push_back(' ');
529+
appendNumberTo(&r, f[i]->number);
530+
r.push_back(':');
531+
appendNumberTo(&r, f[i]->fileSize);
532+
r.append("[");
533+
r.append(f[i]->smallest.debugString());
534+
r.append(" .. ");
535+
r.append(f[i]->largest.debugString());
536+
r.append("]\n");
537+
}
538+
}
539+
return r;
540+
}
541+
514542
int Version::pickLevelForMemTableOutput(const std::string_view &smallestUserKey,
515543
const std::string_view &largestUserKey)
516544
{
@@ -866,6 +894,13 @@ void VersionSet::markFileNumberUsed(uint64_t number)
866894
}
867895
}
868896

897+
int64_t VersionSet::numLevelBytes(int level) const
898+
{
899+
assert(level >= 0);
900+
assert(level < kNumLevels);
901+
return totalFileSize(current()->files[level]);
902+
}
903+
869904
int VersionSet::numLevelFiles(int level) const
870905
{
871906
assert(level >= 0);
@@ -930,6 +965,10 @@ Status VersionSet::logAndApply(VersionEdit *edit)
930965
descriptorLog.reset(new LogWriter(descriptorFile.get()));
931966
//s = WriteSnapshot(descriptor_log_);
932967
}
968+
else
969+
{
970+
971+
}
933972
}
934973

935974
// Write new record to MANIFEST log
@@ -949,6 +988,8 @@ Status VersionSet::logAndApply(VersionEdit *edit)
949988
}
950989
}
951990

991+
descriptorLog.reset();
992+
descriptorFile.reset();
952993
// If we just created a new descriptor file, install it by writing a
953994
// new CURRENT file that points to it.
954995
if (s.ok() && !newManifestFile.empty())

‎example/leveldb/src/versionset.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ class Version
6767
// REQUIRES: lock is held
6868
bool updateStats(const GetStats &stats);
6969

70+
// Return a human readable string that describes this version's contents.
71+
std::string debugString() const;
72+
7073
// No copying allowed
7174
Version(const Version&);
7275
void operator=(const Version&);
@@ -197,7 +200,10 @@ class VersionSet
197200

198201
void appendVersion(const std::shared_ptr<Version> &v);
199202
bool reuseManifest(const std::string &dscname, const std::string &dscbase);
203+
// Return the number of Table files at the specified level.
200204
int numLevelFiles(int level) const;
205+
// Return the combined file size of all files at the specified level.
206+
int64_t numLevelBytes(int level) const;
201207

202208
std::shared_ptr<Iterator> makeInputIterator(Compaction *c);
203209

‎example/mapreduce/sort/filesort.cc

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
#include <unistd.h>
2+
#include <memory>
3+
#include <fstream>
4+
#include <iostream>
5+
#include <unordered_map>
6+
#include <map>
7+
#include <vector>
8+
#include <set>
9+
#include <assert.h>
10+
#include <algorithm>
11+
12+
int32_t index = 0;
13+
const size_t kMaxSize = 10 * 1000 * 1000;
14+
15+
class Sharder
16+
{
17+
public:
18+
explicit Sharder(int32_t nbuckets)
19+
{
20+
for (int32_t i = 0; i < nbuckets; ++i)
21+
{
22+
char buf[256];
23+
snprintf(buf, sizeof buf, "shard-%05d-of-%05d", i, nbuckets);
24+
std::shared_ptr<std::ofstream> in(new std::ofstream(buf));
25+
buckets.push_back(in);
26+
}
27+
assert(buckets.size() == static_cast<size_t>(nbuckets));
28+
}
29+
30+
void output(const std::string &key, const std::string &value)
31+
{
32+
size_t idx = std::hash<std::string>()(key) % buckets.size();
33+
*buckets[idx] << key << '\t' << value << '\n';
34+
}
35+
36+
protected:
37+
std::vector<std::shared_ptr<std::ofstream>> buckets;
38+
private:
39+
Sharder(const Sharder&);
40+
void operator=(const Sharder&);
41+
};
42+
43+
void shard(int32_t nbuckets, int32_t argc, char *argv[])
44+
{
45+
Sharder sharder(nbuckets);
46+
for (int32_t i = 1; i < argc; ++i)
47+
{
48+
std::cout << " processing input file " << argv[i] << std::endl;
49+
std::map<std::string, std::string> shareds;
50+
std::ifstream in(argv[i]);
51+
while (in && !in.eof())
52+
{
53+
shareds.clear();
54+
std::string str;
55+
while (in >> str)
56+
{
57+
shareds.insert(std::make_pair(str, str));
58+
if (shareds.size() > kMaxSize)
59+
{
60+
std::cout << " split" << std::endl;
61+
break;
62+
}
63+
}
64+
65+
for (const auto &kv : shareds)
66+
{
67+
sharder.output(kv.first, kv.second);
68+
}
69+
}
70+
}
71+
std::cout << "shuffling done" << std::endl;
72+
}
73+
74+
// ======= sortShareds =======
75+
76+
std::map<std::string, std::string> readShard(int32_t idx, int32_t nbuckets)
77+
{
78+
std::map<std::string, std::string> shareds;
79+
80+
char buf[256];
81+
snprintf(buf, sizeof buf, "shard-%05d-of-%05d", idx, nbuckets);
82+
std::cout << " reading " << buf << std::endl;
83+
{
84+
std::ifstream in(buf);
85+
std::string line;
86+
87+
while (getline(in, line))
88+
{
89+
size_t tab = line.find('\t');
90+
assert(tab != std::string::npos);
91+
std::string key(line.c_str(), line.c_str() + tab);
92+
std::string value(line.c_str() + tab + 1, line.c_str() + line.size());
93+
shareds.insert(std::make_pair(key, value));
94+
}
95+
}
96+
97+
::unlink(buf);
98+
return shareds;
99+
}
100+
101+
void sortShareds(const int32_t nbuckets)
102+
{
103+
for (int32_t i = 0; i < nbuckets; ++i)
104+
{
105+
// std::cout << " sorting " << std::endl;
106+
std::map<std::string, std::string> shareds;
107+
for (const auto &entry : readShard(i, nbuckets))
108+
{
109+
shareds.insert(std::make_pair(entry.first, entry.second));
110+
}
111+
112+
char buf[256];
113+
snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, nbuckets);
114+
std::ofstream out(buf);
115+
std::cout << " writing " << buf << std::endl;
116+
for (auto &it : shareds)
117+
{
118+
out << it.first << '\t' << it.second << '\n';
119+
}
120+
}
121+
std::cout << "reducing done" << std::endl;
122+
}
123+
124+
// ======= merge =======
125+
126+
class Source // copyable
127+
{
128+
public:
129+
explicit Source(const std::shared_ptr<std::ifstream> &in)
130+
: in(in)
131+
{
132+
133+
}
134+
135+
bool next()
136+
{
137+
std::string line;
138+
if (getline(*(in.get()), line))
139+
{
140+
size_t tab = line.find('\t');
141+
if (tab != std::string::npos)
142+
{
143+
std::string key(line.c_str(), line.c_str() + tab);
144+
this->key = key;
145+
std::string(line.c_str() + tab + 1, line.c_str() + line.size());
146+
this->value = value;
147+
return true;
148+
}
149+
}
150+
return false;
151+
}
152+
153+
bool operator<(const Source &rhs) const
154+
{
155+
return key < rhs.key;
156+
}
157+
158+
void retrieve()
159+
{
160+
index += key.size() + 3 + value.size() + sizeof(int32_t)
161+
}
162+
163+
void outputTo(std::ostream &out, int32_t index) const
164+
{
165+
out << key << '\t' << value << '\t' << std::to_string(index) << '\n';
166+
retrieve();
167+
}
168+
169+
std::shared_ptr<std::ifstream> in;
170+
std::string key;
171+
std::string value;
172+
};
173+
174+
void merge(const int32_t nbuckets)
175+
{
176+
std::vector<std::shared_ptr<std::ifstream>> inputs;
177+
std::vector<Source> keys;
178+
179+
for (int32_t i = 0; i < nbuckets; ++i)
180+
{
181+
char buf[256];
182+
snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, nbuckets);
183+
std::shared_ptr<std::ifstream> in(new std::ifstream(buf));
184+
inputs.push_back(in);
185+
Source rec(in);
186+
if (rec.next())
187+
{
188+
keys.push_back(rec);
189+
}
190+
::unlink(buf);
191+
}
192+
193+
std::ofstream out("output");
194+
std::make_heap(keys.begin(), keys.end());
195+
196+
while (!keys.empty())
197+
{
198+
std::pop_heap(keys.begin(), keys.end());
199+
keys.back().outputTo(out, index);
200+
201+
if (keys.back().next())
202+
{
203+
std::push_heap(keys.begin(), keys.end());
204+
}
205+
else
206+
{
207+
keys.pop_back();
208+
}
209+
}
210+
}
211+
212+
int32_t main(int32_t argc, char *argv[])
213+
{
214+
int32_t nbuckets = 10;
215+
shard(nbuckets, argc, argv);
216+
sortShareds(nbuckets);
217+
merge(nbuckets);
218+
}

‎example/mapreduce/sort/shardsbasic.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ class Source // copyable
143143
bool next()
144144
{
145145
std::string line;
146-
if (getline(*in, line))
146+
if (getline(*(in.get()), line))
147147
{
148148
size_t tab = line.find('\t');
149149
if (tab != std::string::npos)
@@ -184,7 +184,7 @@ void merge(const int32_t nbuckets)
184184
snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, nbuckets);
185185
std::shared_ptr<std::ifstream> in(new std::ifstream(buf));
186186
inputs.push_back(in);
187-
Source rec(inputs.back());
187+
Source rec(in);
188188
if (rec.next())
189189
{
190190
keys.push_back(rec);
@@ -208,6 +208,7 @@ void merge(const int32_t nbuckets)
208208
if (++cnt >= topK)
209209
{
210210
keys.pop_back();
211+
break;
211212
}
212213

213214
if (keys.back().next())

‎example/memcached/server/server.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ void Item::output(Buffer *out, bool needCas) const
3333
{
3434
out->append("VALUE ");
3535
out->append(data, keyLen);
36-
xLogStream buf;
36+
LogStream buf;
3737
buf << ' ' << getFlags() << ' ' << valueLen - 2;
3838
if (needCas)
3939
{
@@ -581,17 +581,17 @@ bool MemcacheServer::storeItem(const ItemPtr &item, Item::UpdatePolicy policy, b
581581

582582
ConstItemPtr MemcacheServer::getItem(const ConstItemPtr &key) const
583583
{
584-
std::mutex & mutex = shards[key->getHash() % kShards].mutex;
585-
const ItemMap& items = shards[key->getHash() % kShards].items;
584+
std::mutex &mutex = shards[key->getHash() % kShards].mutex;
585+
const ItemMap &items = shards[key->getHash() % kShards].items;
586586
std::unique_lock <std::mutex> lck(mutex);
587587
ItemMap::const_iterator it = items.find(key);
588588
return it != items.end() ? *it : ConstItemPtr();
589589
}
590590

591591
bool MemcacheServer::deleteItem(const ConstItemPtr &key)
592592
{
593-
std::mutex & mutex = shards[key->getHash() % kShards].mutex;
594-
ItemMap& items = shards[key->getHash() % kShards].items;
593+
std::mutex &mutex = shards[key->getHash() % kShards].mutex;
594+
ItemMap &items = shards[key->getHash() % kShards].items;
595595
std::unique_lock <std::mutex> lck(mutex);
596596
return items.erase(key) == 1;
597597
}

‎example/redisproxy/redisproxy.cc

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,7 @@ void RedisProxy::initRedisTimer()
9898
{
9999
auto it = threadHiredis.find(pools[i]->getThreadId());
100100
assert(it != threadHiredis.end());
101-
if (clusterEnabled)
102-
{
103-
pools[i]->runAfter(1.0, true, std::bind(&Hiredis::redisContextTimer, it->second.get()));
104-
}
101+
pools[i]->runAfter(1.0, true, std::bind(&Hiredis::redisContextTimer, it->second.get()));
105102
}
106103
}
107104

@@ -335,12 +332,13 @@ void RedisProxy::proxyCallback(const RedisAsyncContextPtr &c,
335332
RedisCluster proxy = std::any_cast<RedisCluster>(privdata);
336333

337334
int64_t proxyCount = proxy.proxyCount;
338-
int32_t commandCount = proxy.commandCount;
335+
int64_t commandCount = proxy.commandCount;
339336
RedisObjectPtr command = proxy.command;
340337
WeakTcpConnectionPtr weakConn = proxy.conn;
341338
TcpConnectionPtr conn = weakConn.lock();
342339
if (conn == nullptr)
343340
{
341+
LOG_WARN << "proxyCallback client disconnect err: ";
344342
return;
345343
}
346344

@@ -602,26 +600,26 @@ RedisAsyncContextPtr RedisProxy::checkReply(const RedisObjectPtr &command, const
602600
auto it = threadHiredis.find(conn->getLoop()->getThreadId());
603601
assert(it != threadHiredis.end());
604602

605-
RedisAsyncContextPtr redis = nullptr;
603+
RedisAsyncContextPtr redisAsync = nullptr;
606604
if (clusterEnabled)
607605
{
608-
redis = it->second->getRedisAsyncContext(command,
606+
redisAsync = it->second->getRedisAsyncContext(command,
609607
conn->getLoop()->getThreadId(), conn->getSockfd());
610608
}
611609
else
612610
{
613-
redis = it->second->getRedisAsyncContext(
611+
redisAsync = it->second->getRedisAsyncContext(
614612
conn->getLoop()->getThreadId(), conn->getSockfd());
615613
}
616614

617-
if (redis == nullptr)
615+
if (redisAsync == nullptr)
618616
{
619617
clearProxy(conn->getLoop()->getThreadId(), conn->getSockfd());
620618
std::string reply = it->second->getTcpClientInfo(
621619
conn->getLoop()->getThreadId(), conn->getSockfd());
622620
conn->sendPipe(reply.c_str(), reply.size());
623621
}
624-
return redis;
622+
return redisAsync;
625623
}
626624

627625
bool RedisProxy::debugCommand(const RedisObjectPtr &command, const std::vector<RedisObjectPtr> &commands,

‎src/redis/all.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
#include <experimental/filesystem>
4545
#include <ratio>
4646
#include <chrono>
47+
#include <random>
4748
#ifdef _LUA
4849
#include <lua.hpp>
4950
#endif
@@ -114,9 +115,19 @@ typedef SSIZE_T ssize_t;
114115
#include <sys/utsname.h>
115116
#endif
116117

117-
#define REDIS_CONNECT_RETRIES 10
118+
#define LUA_TNONE (-1)
118119

120+
#define LUA_TNIL 0
121+
#define LUA_TBOOLEAN 1
122+
#define LUA_TLIGHTUSERDATA 2
123+
#define LUA_TNUMBER 3
124+
#define LUA_TSTRING 4
125+
#define LUA_TTABLE 5
126+
#define LUA_TFUNCTION 6
127+
#define LUA_TUSERDATA 7
128+
#define LUA_TTHREAD 8
119129

130+
#define REDIS_CONNECT_RETRIES 10
120131
/* Flag specific to the async API which means that the context should be clean
121132
* up as soon as possible. */
122133
#define REDIS_FREEING 10

‎src/redis/buffer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ class Buffer
156156

157157
void append(const char *data)
158158
{
159-
int32_t len = strlen(data) + 1;
159+
int32_t len = strlen(data);
160160
ensureWritableBytes(len);
161161
std::copy(data, data + len, beginWrite());
162162
hasWritten(len);

‎src/redis/hiredis.cc

Lines changed: 130 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -676,7 +676,8 @@ int32_t RedisReader::redisReaderGetReply(RedisReplyPtr &reply)
676676

677677
RedisAsyncCallback::RedisAsyncCallback()
678678
:data(nullptr),
679-
len(0)
679+
len(0),
680+
type(false)
680681
{
681682

682683
}
@@ -685,7 +686,14 @@ RedisAsyncCallback::~RedisAsyncCallback()
685686
{
686687
if (data != nullptr)
687688
{
688-
zfree(data);
689+
if (type)
690+
{
691+
sdsfree(data);
692+
}
693+
else
694+
{
695+
zfree(data);
696+
}
689697
}
690698
}
691699

@@ -2030,6 +2038,30 @@ uint32_t Hiredis::keyHashSlot(char *key, int32_t keylen)
20302038
return crc16(key + s + 1, e - s - 1) & 0x3FFF;
20312039
}
20322040

2041+
int32_t RedisAsyncContext::redisAsyncCommandArgv(const RedisCallbackFn &fn,
2042+
const std::any &privdata, int32_t argc, const char **argv, const int32_t *argvlen)
2043+
{
2044+
sds cmd;
2045+
int32_t len;
2046+
int32_t status;
2047+
len = redisFormatSdsCommandArgv(&cmd, argc, argv, argvlen);
2048+
RedisCallback cb;
2049+
cb.fn = std::move(fn);
2050+
cb.privdata = std::move(privdata);
2051+
2052+
RedisAsyncCallbackPtr asyncCallback(new RedisAsyncCallback());
2053+
asyncCallback->data = cmd;
2054+
asyncCallback->len = len;
2055+
asyncCallback->cb = std::move(cb);
2056+
asyncCallback->type = true;
2057+
2058+
TcpConnectionPtr conn = weakRedisConn.lock();
2059+
assert(conn != nullptr);
2060+
conn->getLoop()->runInLoop(std::bind(&RedisAsyncContext::__redisAsyncCommand,
2061+
shared_from_this(), asyncCallback));
2062+
return REDIS_OK;
2063+
}
2064+
20332065
RedisAsyncContextPtr Hiredis::getRedisAsyncContext(const RedisObjectPtr &command,
20342066
const std::thread::id &threadId, int32_t sockfd)
20352067
{
@@ -2126,6 +2158,16 @@ RedisAsyncContextPtr Hiredis::getRedisAsyncContext(const std::thread::id &thread
21262158
}
21272159
}
21282160

2161+
RedisContextPtr Hiredis::getRedisContext(const int32_t sockfd)
2162+
{
2163+
std::unique_lock<std::mutex> lk(mutex);
2164+
if (redisContexts.empty())
2165+
{
2166+
return nullptr;
2167+
}
2168+
return redisContexts[(redisContexts.size() - 1) % sockfd];
2169+
}
2170+
21292171
std::vector<RedisContextPtr> Hiredis::getRedisContext(const std::thread::id &threadId)
21302172
{
21312173
std::unique_lock<std::mutex> lk(mutex);
@@ -2159,13 +2201,7 @@ RedisAsyncContextPtr Hiredis::getRedisAsyncContext(int32_t sockfd)
21592201
return nullptr;
21602202
}
21612203

2162-
int32_t index = (tcpClients.size() % sockfd) - 1;
2163-
if (index < 0 )
2164-
{
2165-
index = 0;
2166-
}
2167-
2168-
const TcpConnectionPtr &conn = tcpClients[index]->getConnection();
2204+
const TcpConnectionPtr &conn = tcpClients[(tcpClients.size() - 1) % sockfd]->getConnection();
21692205
if (conn == nullptr)
21702206
{
21712207
return nullptr;
@@ -2651,104 +2687,103 @@ TcpConnectionPtr Hiredis::redirectySlot(int32_t sockfd, EventLoop *loop, const c
26512687
void Hiredis::redisContextTimer()
26522688
{
26532689
loop->assertInLoopThread();
2654-
std::map<int16_t, std::string> ports;
26552690

2691+
std::unique_lock<std::mutex> lk(mutex);
2692+
auto it = threadRedisContexts.find(loop->getThreadId());
2693+
assert(it != threadRedisContexts.end());
2694+
2695+
for (auto &iter : it->second)
26562696
{
2657-
std::unique_lock<std::mutex> lk(mutex);
2658-
auto it = threadRedisContexts.find(loop->getThreadId());
2659-
assert(it != threadRedisContexts.end());
2697+
RedisReplyPtr reply = iter->redisCommand("PING");
2698+
if (reply == nullptr ||
2699+
!(strcmp(reply->str, "PONG") == 0)
2700+
|| reply->type != REDIS_REPLY_STATUS)
2701+
{
2702+
iter = redisConnect(iter->ip, iter->port);
2703+
}
26602704

2661-
for (auto &iter : it->second)
2705+
if (!proxyMode)
26622706
{
2663-
RedisReplyPtr reply = iter->redisCommand("PING");
2664-
if (reply != nullptr)
2665-
{
2666-
assert(strcmp(reply->str, "PONG") == 0);
2667-
assert(reply->type == REDIS_REPLY_STATUS);
2668-
}
2669-
else
2670-
{
2671-
iter = redisConnect(iter->ip, iter->port);
2672-
}
2707+
return ;
2708+
}
26732709

2674-
clusterNodes.clear();
2675-
reply = iter->redisCommand("cluster nodes");
2676-
if (reply != nullptr && reply->type != REDIS_REPLY_ERROR)
2710+
clusterNodes.clear();
2711+
reply = iter->redisCommand("cluster nodes");
2712+
if (reply != nullptr && reply->type != REDIS_REPLY_ERROR)
2713+
{
2714+
int32_t index = 0;
2715+
while (index < sdslen(reply->str))
26772716
{
2678-
int32_t index = 0;
2679-
while (index < sdslen(reply->str))
2717+
std::shared_ptr<ClusterNode> node(new ClusterNode());
2718+
const char *id = strchr(reply->str + index, ' ');
2719+
assert(id != nullptr);
2720+
2721+
node->id = std::string(reply->str + index, id - (reply->str + index));
2722+
const char *ip = strchr(id + 1, ':');
2723+
assert(ip != nullptr);
2724+
2725+
node->ip = std::string(id + 1, ip - id - 1);
2726+
const char *port = strchr(ip + 1, '@');
2727+
assert(port != nullptr);
2728+
2729+
int16_t p = atoi(ip + 1);
2730+
node->port = p;
2731+
const char *p1 = strchr(port + 1, ' ');
2732+
assert(p1 != nullptr);
2733+
2734+
const char *myself = strchr(p1 + 1, ' ');
2735+
std::string m = std::string(p1 + 1, myself - p1 - 1);
2736+
if (m == "myself,master" || m == "master")
26802737
{
2681-
std::shared_ptr<ClusterNode> node(new ClusterNode());
2682-
const char *id = strchr(reply->str + index, ' ');
2683-
assert(id != nullptr);
2684-
2685-
node->id = std::string(reply->str + index, id - (reply->str + index));
2686-
const char *ip = strchr(id + 1, ':');
2687-
assert(ip != nullptr);
2688-
2689-
node->ip = std::string(id + 1, ip - id - 1);
2690-
const char *port = strchr(ip + 1, '@');
2691-
assert(port != nullptr);
2692-
2693-
int16_t p = atoi(ip + 1);
2694-
node->port = p;
2695-
const char *p1 = strchr(port + 1, ' ');
2738+
node->master = "master";
2739+
const char *p1 = strchr(myself + 1, ' ');
26962740
assert(p1 != nullptr);
2697-
2698-
const char *myself = strchr(p1 + 1, ' ');
2699-
std::string m = std::string(p1 + 1, myself - p1 - 1);
2700-
if (m == "myself,master" || m == "master")
2741+
const char *p2 = strchr(p1 + 1, ' ');
2742+
assert(p2 != nullptr);
2743+
const char *p3 = strchr(p2 + 1, ' ');
2744+
assert(p3 != nullptr);
2745+
const char *p4 = strchr(p3 + 1, ' ');
2746+
assert(p4 != nullptr);
2747+
const char *p5 = strchr(p4 + 1, ' ');
2748+
assert(p5 != nullptr);
2749+
std::string str = std::string(p4 + 1, p5 - p4 - 1);
2750+
2751+
if (str == "connected")
27012752
{
2702-
node->master = "master";
2703-
const char *p1 = strchr(myself + 1, ' ');
2704-
assert(p1 != nullptr);
2705-
const char *p2 = strchr(p1 + 1, ' ');
2706-
assert(p2 != nullptr);
2707-
const char *p3 = strchr(p2 + 1, ' ');
2708-
assert(p3 != nullptr);
2709-
const char *p4 = strchr(p3 + 1, ' ');
2710-
assert(p4 != nullptr);
2711-
const char *p5 = strchr(p4 + 1, ' ');
2712-
assert(p5 != nullptr);
2713-
std::string str = std::string(p4 + 1, p5 - p4 - 1);
2714-
2715-
if (str == "connected")
2716-
{
2717-
node->status = 1;
2718-
}
2719-
else
2720-
{
2721-
node->status = 0;
2722-
}
2723-
2724-
int16_t startSlot = atoi(p5 + 1);
2725-
node->startSlot = startSlot;
2726-
const char *p6 = strchr(p5 + 1, '-');
2727-
assert(p6 != nullptr);
2728-
const char *p7 = strchr(p6 + 1, '\n');
2729-
assert(p7 != nullptr);
2730-
int16_t endSlot = atoi(p6 + 1);
2731-
node->endSlot = endSlot;
2732-
index = p7 + 1 - reply->str;
2753+
node->status = 1;
27332754
}
27342755
else
27352756
{
2736-
node->slave = "slave";
2737-
const char *p1 = strchr(myself + 1, ' ');
2738-
assert(p1 != nullptr);
2739-
std::string master = std::string(myself + 1, p1 - myself - 1);
2740-
const char *p2 = strchr(p1 + 1, ' ');
2741-
assert(p2 != nullptr);
2742-
const char *p3 = strchr(p2 + 1, ' ');
2743-
assert(p3 != nullptr);
2744-
const char *p4 = strchr(p3 + 1, ' ');
2745-
assert(p4 != nullptr);
2746-
const char *p5 = strchr(p4 + 1, '\n');
2747-
assert(p5 != nullptr);
2748-
index = p5 + 1 - reply->str;
2757+
node->status = 0;
27492758
}
2750-
clusterNodes.push_back(node);
2759+
2760+
int16_t startSlot = atoi(p5 + 1);
2761+
node->startSlot = startSlot;
2762+
const char *p6 = strchr(p5 + 1, '-');
2763+
assert(p6 != nullptr);
2764+
const char *p7 = strchr(p6 + 1, '\n');
2765+
assert(p7 != nullptr);
2766+
int16_t endSlot = atoi(p6 + 1);
2767+
node->endSlot = endSlot;
2768+
index = p7 + 1 - reply->str;
2769+
}
2770+
else
2771+
{
2772+
node->slave = "slave";
2773+
const char *p1 = strchr(myself + 1, ' ');
2774+
assert(p1 != nullptr);
2775+
std::string master = std::string(myself + 1, p1 - myself - 1);
2776+
const char *p2 = strchr(p1 + 1, ' ');
2777+
assert(p2 != nullptr);
2778+
const char *p3 = strchr(p2 + 1, ' ');
2779+
assert(p3 != nullptr);
2780+
const char *p4 = strchr(p3 + 1, ' ');
2781+
assert(p4 != nullptr);
2782+
const char *p5 = strchr(p4 + 1, '\n');
2783+
assert(p5 != nullptr);
2784+
index = p5 + 1 - reply->str;
27512785
}
2786+
clusterNodes.push_back(node);
27522787
}
27532788
}
27542789

@@ -2770,14 +2805,14 @@ void Hiredis::redisContextTimer()
27702805

27712806
if (!mark)
27722807
{
2773-
ports[it->port] = it->ip;
2808+
consul[it->port] = it->ip;
27742809
}
27752810
}
27762811
}
27772812
}
27782813
}
27792814

2780-
for (auto &it : ports)
2815+
for (auto &it : consul)
27812816
{
27822817
start(loop, 0, it.second.c_str(), it.first);
27832818
}

‎src/redis/hiredis.h

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,8 @@ struct RedisAsyncCallback
169169

170170
RedisCallback cb;
171171
int32_t len;
172-
char *data;
172+
sds data;
173+
bool type;
173174
};
174175

175176
/* Subscription callbacks */
@@ -209,13 +210,16 @@ class RedisAsyncContext : public std::enable_shared_from_this<RedisAsyncContext>
209210
RedisAsyncContext(Buffer *buffer, const TcpConnectionPtr &conn);
210211
~RedisAsyncContext();
211212

213+
int32_t redisAsyncCommandArgv(const RedisCallbackFn &fn,
214+
const std::any &privdata, int32_t argc,
215+
const char **argv, const int32_t *argvlen);
212216
int32_t __redisAsyncCommand(const RedisAsyncCallbackPtr &asyncCallback);
213217
int32_t redisvAsyncCommand(const RedisCallbackFn &fn,
214218
const std::any &privdata, const char *format, va_list ap);
215219
int32_t redisAsyncCommand(const RedisCallbackFn &fn,
216220
const std::any &privdata, const char *format, ...);
217-
int32_t threadProxyRedisvAsyncCommand(const RedisCallbackFn &fn, const char *data,
218-
int32_t len, const std::any &privdata);
221+
int32_t threadProxyRedisvAsyncCommand(const RedisCallbackFn &fn,
222+
const char *data, int32_t len, const std::any &privdata);
219223

220224
int32_t proxyAsyncCommand(const RedisAsyncCallbackPtr &asyncCallback);
221225
int32_t processCommand(const RedisCallbackFn &fn,
@@ -292,7 +296,7 @@ class Hiredis
292296
RedisAsyncContextPtr getRedisAsyncContext(int32_t sockfd);
293297
RedisAsyncContextPtr getRedisAsyncContext();
294298
std::vector<RedisContextPtr> getRedisContext(const std::thread::id &threadId);
295-
299+
RedisContextPtr getRedisContext(const int32_t sockfd);
296300
RedisAsyncContextPtr getClusterRedisAsyncContext(const std::thread::id &threadId);
297301
std::string getTcpClientInfo(const std::thread::id &threadId, int32_t sockfd);
298302
std::string setTcpClientInfo(const char *ip, int16_t port);
@@ -312,7 +316,7 @@ class Hiredis
312316
std::unordered_map<std::thread::id, std::vector<RedisContextPtr>> threadRedisContexts;
313317
std::vector<TcpConnectionPtr> moveAskClients;
314318
std::vector<std::shared_ptr<ClusterNode>> clusterNodes;
315-
319+
std::map<int16_t, std::string> consul;
316320
ConnectionCallback connectionCallback;
317321
DisConnectionCallback disConnectionCallback;
318322
EventLoop *loop;

‎src/redis/log.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -525,7 +525,6 @@ void defaultOutput(const char* msg, int32_t len)
525525
size_t n = ::fwrite(msg, 1, len, stdout);
526526
//FIXME check n
527527
(void)n;
528-
printf("%s\n", msg);
529528
}
530529

531530
void defaultFlush()

0 commit comments

Comments
 (0)
Please sign in to comment.