Skip to content

Commit 8bac268

Browse files
committed
redis proxy reconnect , master replication change status error ,add proxy log
1 parent 09a7a76 commit 8bac268

File tree

212 files changed

+34790
-39294
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

212 files changed

+34790
-39294
lines changed

bench/bench/bench.cc

+104-119
Original file line numberDiff line numberDiff line change
@@ -1,139 +1,124 @@
11
#pragma once
2+
23
#include "all.h"
34
#include "tcpconnection.h"
45
#include "tcpserver.h"
56
#include "eventloop.h"
67
#include "log.h"
78
#include "timerqueue.h"
89

9-
std::vector<int32_t> pipes;
10+
std::vector <int32_t> pipes;
1011
int32_t numPipes;
1112
int32_t numActive;
1213
int32_t numWrites;
1314
EventLoop *loop;
14-
std::vector<std::shared_ptr<Channel>> channels;
15-
int32_t reads,writes,fired;
16-
17-
void readCallback(int32_t fd,int32_t idx)
18-
{
19-
char ch;
20-
reads += static_cast<int32_t>(::recv(fd,&ch,sizeof(ch),0));
21-
if (writes > 0)
22-
{
23-
int32_t widx = idx+1;
24-
if (widx >= numPipes)
25-
{
26-
widx -= numPipes;
27-
}
28-
29-
::send(pipes[2 * widx + 1],"m",1,0);
30-
writes--;
31-
fired++;
32-
}
33-
34-
if (fired == reads)
35-
{
36-
loop->quit();
37-
}
15+
std::vector <std::shared_ptr<Channel>> channels;
16+
int32_t reads, writes, fired;
17+
18+
void readCallback(int32_t fd, int32_t idx) {
19+
char ch;
20+
reads += static_cast<int32_t>(::recv(fd, &ch, sizeof(ch), 0));
21+
if (writes > 0) {
22+
int32_t widx = idx + 1;
23+
if (widx >= numPipes) {
24+
widx -= numPipes;
25+
}
26+
27+
::send(pipes[2 * widx + 1], "m", 1, 0);
28+
writes--;
29+
fired++;
30+
}
31+
32+
if (fired == reads) {
33+
loop->quit();
34+
}
3835
}
3936

4037

41-
std::pair<int32_t,int32_t> runOnce()
42-
{
43-
TimeStamp beforeInit(TimeStamp::now());
44-
for(int32_t i = 0; i < numPipes ; i++)
45-
{
46-
std::shared_ptr<Channel> channel = channels[i];
47-
channel->setReadCallback(std::bind(readCallback,channel->getfd(),i));
48-
channel->enableReading();
49-
}
50-
51-
int32_t space = numPipes / numActive;
52-
space *= 2;
53-
for (int32_t i = 0; i < numActive; ++i)
54-
{
55-
::send(pipes[i * space + 1], "m", 1, 0);
56-
}
57-
58-
fired = numActive;
59-
reads = 0;
60-
writes = numWrites;
61-
TimeStamp beforeLoop(TimeStamp::now());
62-
loop->run();
63-
64-
TimeStamp end(TimeStamp::now());
65-
66-
int32_t iterTime = static_cast<int32_t>(end.getMicroSecondsSinceEpoch() - beforeInit.getMicroSecondsSinceEpoch());
67-
int32_t loopTime = static_cast<int32_t>(end.getMicroSecondsSinceEpoch() - beforeLoop.getMicroSecondsSinceEpoch());
68-
return std::make_pair(iterTime,loopTime);
38+
std::pair <int32_t, int32_t> runOnce() {
39+
TimeStamp beforeInit(TimeStamp::now());
40+
for (int32_t i = 0; i < numPipes; i++) {
41+
std::shared_ptr <Channel> channel = channels[i];
42+
channel->setReadCallback(std::bind(readCallback, channel->getfd(), i));
43+
channel->enableReading();
44+
}
45+
46+
int32_t space = numPipes / numActive;
47+
space *= 2;
48+
for (int32_t i = 0; i < numActive; ++i) {
49+
::send(pipes[i * space + 1], "m", 1, 0);
50+
}
51+
52+
fired = numActive;
53+
reads = 0;
54+
writes = numWrites;
55+
TimeStamp beforeLoop(TimeStamp::now());
56+
loop->run();
57+
58+
TimeStamp end(TimeStamp::now());
59+
60+
int32_t iterTime = static_cast<int32_t>(end.getMicroSecondsSinceEpoch() - beforeInit.getMicroSecondsSinceEpoch());
61+
int32_t loopTime = static_cast<int32_t>(end.getMicroSecondsSinceEpoch() - beforeLoop.getMicroSecondsSinceEpoch());
62+
return std::make_pair(iterTime, loopTime);
6963
}
7064

71-
int main(int argc,char* argv[])
72-
{
73-
numPipes = 100;
74-
numActive = 1;
75-
numWrites = 100;
76-
int32_t c;
77-
78-
while ((c = getopt(argc,argv,"n:a:w:")) != -1)
79-
{
80-
switch (c)
81-
{
82-
case 'n':
83-
numPipes = atoi(optarg);
84-
break;
85-
case 'a':
86-
numActive = atoi(optarg);
87-
break;
88-
case 'w':
89-
numWrites = atoi(optarg);
90-
break;
91-
default:
92-
fprintf(stderr, "Illegal argument \"%c\"\n", c);
93-
return 1;
94-
}
95-
}
96-
97-
struct rlimit rl;
98-
rl.rlim_cur = rl.rlim_max = numPipes * 2 + 50;
99-
if(::setrlimit(RLIMIT_NOFILE,&rl) == -1)
100-
{
101-
perror("setrlimit");
102-
}
103-
104-
pipes.resize(2 * numPipes);
105-
for(int32_t i = 0; i < numPipes; ++i)
106-
{
107-
if (::socketpair(AF_UNIX,SOCK_STREAM,0,&pipes[i*2]) == -1)
108-
{
109-
perror("pipe");
110-
return 1;
111-
}
112-
}
113-
114-
EventLoop lop;
115-
loop = &lop;
116-
117-
for(int32_t i = 0 ; i < numPipes; i ++)
118-
{
119-
std::shared_ptr<Channel> channel(new Channel(loop,pipes[i*2]));
120-
channels.push_back(channel);
121-
}
122-
123-
for (int32_t i = 0; i < 25; ++i)
124-
{
125-
std::pair<int32_t,int32_t> t = runOnce();
126-
printf("%8d %8d\n",t.first,t.second);
127-
}
128-
129-
for(auto &it : channels)
130-
{
131-
it.disableAll();
132-
it.remove();
133-
}
134-
135-
channels.clear();
136-
return 0;
65+
int main(int argc, char *argv[]) {
66+
numPipes = 100;
67+
numActive = 1;
68+
numWrites = 100;
69+
int32_t c;
70+
71+
while ((c = getopt(argc, argv, "n:a:w:")) != -1) {
72+
switch (c) {
73+
case 'n':
74+
numPipes = atoi(optarg);
75+
break;
76+
case 'a':
77+
numActive = atoi(optarg);
78+
break;
79+
case 'w':
80+
numWrites = atoi(optarg);
81+
break;
82+
default:
83+
fprintf(stderr, "Illegal argument \"%c\"\n", c);
84+
return 1;
85+
}
86+
}
87+
88+
struct rlimit rl;
89+
rl.rlim_cur = rl.rlim_max = numPipes * 2 + 50;
90+
if (::setrlimit(RLIMIT_NOFILE, &rl) == -1) {
91+
perror("setrlimit");
92+
}
93+
94+
pipes.resize(2 * numPipes);
95+
for (int32_t i = 0; i < numPipes; ++i) {
96+
if (::socketpair(AF_UNIX, SOCK_STREAM, 0, &pipes[i * 2]) == -1) {
97+
perror("pipe");
98+
return 1;
99+
}
100+
}
101+
102+
EventLoop lop;
103+
loop = &lop;
104+
105+
for (int32_t i = 0; i < numPipes; i++) {
106+
std::shared_ptr <Channel> channel(new Channel(loop, pipes[i * 2]));
107+
channels.push_back(channel);
108+
}
109+
110+
for (int32_t i = 0; i < 25; ++i) {
111+
std::pair <int32_t, int32_t> t = runOnce();
112+
printf("%8d %8d\n", t.first, t.second);
113+
}
114+
115+
for (auto &it : channels) {
116+
it.disableAll();
117+
it.remove();
118+
}
119+
120+
channels.clear();
121+
return 0;
137122
}
138123

139124

0 commit comments

Comments
 (0)