Skip to content

Commit b3e16c9

Browse files
authored
Improve performance for examples. (alibaba#142)
1 parent dba4938 commit b3e16c9

File tree

85 files changed

+6332
-2560
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

85 files changed

+6332
-2560
lines changed

CMakeLists.txt

+5-2
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@ option(BUILD_LIBGRAPELITE_DOCS "Build libgrape-lite documentation" ON)
1919
option(BUILD_LIBGRAPELITE_TESTS "Build libgrape-lite test cases" ON)
2020
option(WCC_USE_GID "Use global id as wcc component id" OFF)
2121

22-
if (USE_HUGEPAGES AND LINUX)
23-
add_definitions(-DUSE_HUGEPAGES)
22+
if (USE_HUGEPAGES)
23+
if (CMAKE_SYSTEM_NAME MATCHES "Linux")
24+
message(STATUS "Enable hugepage")
25+
add_definitions(-DUSE_HUGEPAGES)
26+
endif ()
2427
endif ()
2528

2629
if (PROFILING)
+182
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/** Copyright 2020 Alibaba Group Holding Limited.
2+
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
*/
15+
16+
#ifndef EXAMPLES_ANALYTICAL_APPS_BFS_BFS_OPT_H_
17+
#define EXAMPLES_ANALYTICAL_APPS_BFS_BFS_OPT_H_
18+
19+
#include <grape/grape.h>
20+
21+
#include "bfs/bfs_opt_context.h"
22+
23+
namespace grape {
24+
25+
/**
26+
* @brief An implementation of BFS, the version in LDBC, which can work
27+
* on both directed or undirected graph.
28+
*
29+
* This version of BFS inherits ParallelAppBase. Messages can be sent in
30+
* parallel to the evaluation. This strategy improve performance by overlapping
31+
* the communication time and the evaluation time.
32+
*
33+
* @tparam FRAG_T
34+
*/
35+
template <typename FRAG_T>
36+
class BFSOpt : public ParallelAppBase<FRAG_T, BFSOptContext<FRAG_T>,
37+
ParallelMessageManagerOpt>,
38+
public ParallelEngine {
39+
INSTALL_PARALLEL_OPT_WORKER(BFSOpt<FRAG_T>, BFSOptContext<FRAG_T>, FRAG_T)
40+
using vertex_t = typename fragment_t::vertex_t;
41+
42+
static constexpr bool need_split_edges = true;
43+
44+
void PEval(const fragment_t& frag, context_t& ctx,
45+
message_manager_t& messages) {
46+
using depth_type = typename context_t::depth_type;
47+
48+
messages.InitChannels(thread_num(), 2 * 1023 * 64, 2 * 1024 * 64);
49+
50+
ctx.current_depth = 1;
51+
52+
vertex_t source;
53+
bool native_source = frag.GetInnerVertex(ctx.source_id, source);
54+
55+
auto inner_vertices = frag.InnerVertices();
56+
57+
// init double buffer which contains updated vertices using bitmap
58+
ctx.curr_inner_updated.Init(inner_vertices, GetThreadPool());
59+
ctx.next_inner_updated.Init(inner_vertices, GetThreadPool());
60+
61+
auto& channel_0 = messages.Channels()[0];
62+
63+
// run first round BFS, update unreached vertices
64+
if (native_source) {
65+
ctx.partial_result[source] = 0;
66+
auto oes = frag.GetOutgoingAdjList(source);
67+
for (auto& e : oes) {
68+
auto u = e.get_neighbor();
69+
if (ctx.partial_result[u] == std::numeric_limits<depth_type>::max()) {
70+
ctx.partial_result[u] = 1;
71+
if (frag.IsOuterVertex(u)) {
72+
channel_0.template SyncStateOnOuterVertex<fragment_t>(frag, u);
73+
} else {
74+
ctx.curr_inner_updated.Insert(u);
75+
}
76+
}
77+
}
78+
}
79+
80+
messages.ForceContinue();
81+
}
82+
83+
void IncEval(const fragment_t& frag, context_t& ctx,
84+
message_manager_t& messages) {
85+
using depth_type = typename context_t::depth_type;
86+
87+
auto& channels = messages.Channels();
88+
89+
depth_type next_depth = ctx.current_depth + 1;
90+
ctx.next_inner_updated.ParallelClear(GetThreadPool());
91+
92+
// process received messages and update depth
93+
messages.ParallelProcess<fragment_t, EmptyType>(
94+
thread_num(), frag, [&ctx](int tid, vertex_t v, EmptyType) {
95+
if (ctx.partial_result[v] == std::numeric_limits<depth_type>::max()) {
96+
ctx.partial_result[v] = ctx.current_depth;
97+
ctx.curr_inner_updated.Insert(v);
98+
}
99+
});
100+
101+
// sync messages to other workers
102+
auto ivnum = frag.GetInnerVerticesNum();
103+
auto active = ctx.curr_inner_updated.ParallelCount(GetThreadPool());
104+
double rate = static_cast<double>(active) / static_cast<double>(ivnum);
105+
if (rate > 0.005) {
106+
auto inner_vertices = frag.InnerVertices();
107+
auto outer_vertices = frag.OuterVertices();
108+
ForEach(outer_vertices, [next_depth, &frag, &ctx, &channels](int tid,
109+
vertex_t v) {
110+
if (ctx.partial_result[v] == std::numeric_limits<depth_type>::max()) {
111+
auto ies = frag.GetIncomingAdjList(v);
112+
for (auto& e : ies) {
113+
auto u = e.get_neighbor();
114+
if (ctx.curr_inner_updated.Exist(u)) {
115+
ctx.partial_result[v] = next_depth;
116+
channels[tid].template SyncStateOnOuterVertex<fragment_t>(frag,
117+
v);
118+
break;
119+
}
120+
}
121+
}
122+
});
123+
if (frag.directed()) {
124+
ForEach(inner_vertices, [next_depth, &frag, &ctx](int tid, vertex_t v) {
125+
if (ctx.partial_result[v] == std::numeric_limits<depth_type>::max()) {
126+
auto ies = frag.GetIncomingInnerVertexAdjList(v);
127+
for (auto& e : ies) {
128+
auto u = e.get_neighbor();
129+
if (ctx.curr_inner_updated.Exist(u)) {
130+
ctx.partial_result[v] = next_depth;
131+
ctx.next_inner_updated.Insert(v);
132+
break;
133+
}
134+
}
135+
}
136+
});
137+
} else {
138+
ForEach(inner_vertices, [next_depth, &frag, &ctx](int tid, vertex_t v) {
139+
if (ctx.partial_result[v] == std::numeric_limits<depth_type>::max()) {
140+
auto oes = frag.GetOutgoingInnerVertexAdjList(v);
141+
for (auto& e : oes) {
142+
auto u = e.get_neighbor();
143+
if (ctx.curr_inner_updated.Exist(u)) {
144+
ctx.partial_result[v] = next_depth;
145+
ctx.next_inner_updated.Insert(v);
146+
break;
147+
}
148+
}
149+
}
150+
});
151+
}
152+
} else if (active > 0) {
153+
ForEach(ctx.curr_inner_updated, [next_depth, &frag, &ctx, &channels](
154+
int tid, vertex_t v) {
155+
auto oes = frag.GetOutgoingAdjList(v);
156+
for (auto& e : oes) {
157+
auto u = e.get_neighbor();
158+
if (ctx.partial_result[u] == std::numeric_limits<depth_type>::max()) {
159+
ctx.partial_result[u] = next_depth;
160+
if (frag.IsOuterVertex(u)) {
161+
channels[tid].template SyncStateOnOuterVertex<fragment_t>(frag,
162+
u);
163+
} else {
164+
ctx.next_inner_updated.Insert(u);
165+
}
166+
}
167+
}
168+
});
169+
}
170+
171+
ctx.current_depth = next_depth;
172+
if (!ctx.next_inner_updated.Empty()) {
173+
messages.ForceContinue();
174+
}
175+
176+
ctx.next_inner_updated.Swap(ctx.curr_inner_updated);
177+
}
178+
};
179+
180+
} // namespace grape
181+
182+
#endif // EXAMPLES_ANALYTICAL_APPS_BFS_BFS_OPT_H_
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/** Copyright 2020 Alibaba Group Holding Limited.
2+
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
*/
15+
16+
#ifndef EXAMPLES_ANALYTICAL_APPS_BFS_BFS_OPT_CONTEXT_H_
17+
#define EXAMPLES_ANALYTICAL_APPS_BFS_BFS_OPT_CONTEXT_H_
18+
19+
#include <grape/grape.h>
20+
21+
#include <limits>
22+
23+
namespace grape {
24+
/**
25+
* @brief Context for the parallel version of BFS.
26+
*
27+
* @tparam FRAG_T
28+
*/
29+
template <typename FRAG_T>
30+
class BFSOptContext : public VertexDataContext<FRAG_T, int64_t> {
31+
public:
32+
using depth_type = int64_t;
33+
using oid_t = typename FRAG_T::oid_t;
34+
using vid_t = typename FRAG_T::vid_t;
35+
36+
explicit BFSOptContext(const FRAG_T& fragment)
37+
: VertexDataContext<FRAG_T, int64_t>(fragment, true),
38+
partial_result(this->data()) {}
39+
40+
void Init(ParallelMessageManagerOpt& messages, oid_t src_id) {
41+
source_id = src_id;
42+
partial_result.SetValue(std::numeric_limits<depth_type>::max());
43+
}
44+
45+
void Output(std::ostream& os) override {
46+
auto& frag = this->fragment();
47+
auto inner_vertices = frag.InnerVertices();
48+
49+
for (auto v : inner_vertices) {
50+
os << frag.GetId(v) << " " << partial_result[v] << std::endl;
51+
}
52+
#ifdef PROFILING
53+
VLOG(2) << "preprocess_time: " << preprocess_time << "s.";
54+
VLOG(2) << "exec_time: " << exec_time << "s.";
55+
VLOG(2) << "postprocess_time: " << postprocess_time << "s.";
56+
#endif
57+
}
58+
59+
oid_t source_id;
60+
typename FRAG_T::template vertex_array_t<depth_type>& partial_result;
61+
DenseVertexSet<typename FRAG_T::inner_vertices_t> curr_inner_updated,
62+
next_inner_updated;
63+
64+
depth_type current_depth = 0;
65+
};
66+
} // namespace grape
67+
68+
#endif // EXAMPLES_ANALYTICAL_APPS_BFS_BFS_OPT_CONTEXT_H_

0 commit comments

Comments
 (0)