Skip to content

Commit 627416c

Browse files
authored
A basic implementation of vertex-cut fragment to scale-out on large graphs. (#172)
1 parent 27ea3d2 commit 627416c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+2774
-667
lines changed

CMakeLists.txt

+6
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ option(USE_JEMALLOC "Whether to use jemalloc." OFF)
1414
option(USE_HUGEPAGES "Whether to use hugepages." OFF)
1515
option(BUILD_SHARED_LIBS "Whether to build libgrape-lite as shared library" ON)
1616
option(PROFILING "Whether to enable profiling" OFF)
17+
option(TRACKING_MEMORY "Whether to enable memory tracking" OFF)
1718
option(WITH_ASAN "Build with Address Sanitizer" OFF)
1819
option(BUILD_LIBGRAPELITE_DOCS "Build libgrape-lite documentation" ON)
1920
option(BUILD_LIBGRAPELITE_TESTS "Build libgrape-lite test cases" ON)
@@ -38,6 +39,11 @@ if (PROFILING)
3839
add_definitions(-DPROFILING)
3940
endif ()
4041

42+
if (TRACKING_MEMORY)
43+
message(STATUS "Enable memory tracking")
44+
add_definitions(-DTRACKING_MEMORY)
45+
endif ()
46+
4147
if (WCC_USE_GID)
4248
add_definitions(-DWCC_USE_GID)
4349
endif ()

examples/analytical_apps/bc/bc_context.h

-5
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,6 @@ class BCContext : public VertexDataContext<FRAG_T, float> {
5656
LOG(INFO) << "[frag-" << frag.fid()
5757
<< "] BC(0) = " << centrality_value[s];
5858
}
59-
#ifdef PROFILING
60-
VLOG(2) << "preprocess_time: " << preprocess_time << "s.";
61-
VLOG(2) << "exec_time: " << exec_time << "s.";
62-
VLOG(2) << "postprocess_time: " << postprocess_time << "s.";
63-
#endif
6459
}
6560

6661
oid_t source_id;

examples/analytical_apps/bfs/bfs_opt_context.h

-5
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,6 @@ class BFSOptContext : public VertexDataContext<FRAG_T, int64_t> {
4949
for (auto v : inner_vertices) {
5050
os << frag.GetId(v) << " " << partial_result[v] << std::endl;
5151
}
52-
#ifdef PROFILING
53-
VLOG(2) << "preprocess_time: " << preprocess_time << "s.";
54-
VLOG(2) << "exec_time: " << exec_time << "s.";
55-
VLOG(2) << "postprocess_time: " << postprocess_time << "s.";
56-
#endif
5752
}
5853

5954
oid_t source_id;

examples/analytical_apps/flags.cc

+3
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ DEFINE_string(serialization_prefix, "",
6161
DEFINE_int32(app_concurrency, -1, "concurrency of application");
6262
DEFINE_int32(load_concurrency, 1, "concurrency of loading graph");
6363

64+
DEFINE_bool(vc, false, "whether to use vertex-cut storage.");
65+
DEFINE_bool(single_scan_load, true, "whether to load graph in single scan.");
66+
6467
DEFINE_string(lb, "cta",
6568
"Load balancing policy, these options can be used: "
6669
" none, cta, cm, wm, strict");

examples/analytical_apps/flags.h

+3
Original file line numberDiff line numberDiff line change
@@ -53,5 +53,8 @@ DECLARE_string(serialization_prefix);
5353
DECLARE_int32(app_concurrency);
5454
DECLARE_int32(load_concurrency);
5555

56+
DECLARE_bool(vc);
57+
DECLARE_bool(single_scan_load);
58+
5659
DECLARE_string(lb);
5760
#endif // EXAMPLES_ANALYTICAL_APPS_FLAGS_H_
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/** Copyright 2020 Alibaba Group Holding Limited.
2+
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
*/
15+
16+
#ifndef EXAMPLES_ANALYTICAL_APPS_PAGERANK_PAGERANK_VC_H_
17+
#define EXAMPLES_ANALYTICAL_APPS_PAGERANK_PAGERANK_VC_H_
18+
19+
#include <grape/grape.h>
20+
21+
#include "pagerank/pagerank_vc_context.h"
22+
23+
namespace grape {
24+
25+
template <typename T>
26+
struct NumericSum {
27+
static T init() { return 0; }
28+
29+
static void aggregate(T& a, const T& b) { a += b; }
30+
};
31+
32+
template <typename FRAG_T>
33+
class PageRankVC
34+
: public GatherScatterAppBase<FRAG_T, PageRankVCContext<FRAG_T>>,
35+
public ParallelEngine,
36+
public Communicator {
37+
using vertex_t = Vertex<typename FRAG_T::oid_t>;
38+
39+
public:
40+
INSTALL_GATHER_SCATTER_WORKER(PageRankVC<FRAG_T>, PageRankVCContext<FRAG_T>,
41+
FRAG_T)
42+
43+
void PEval(const fragment_t& frag, context_t& ctx,
44+
message_manager_t& messages) {
45+
if (ctx.max_round <= 0) {
46+
return;
47+
}
48+
49+
ctx.step = 0;
50+
ctx.graph_vnum = frag.GetTotalVerticesNum();
51+
52+
{
53+
#ifdef PROFILING
54+
ctx.t0 -= GetCurrentTime();
55+
#endif
56+
typename fragment_t::template both_vertex_array_t<int> degree(
57+
frag.Vertices(), 0);
58+
#ifdef TRACKING_MEMORY
59+
// allocate degree array for both src and dst vertices
60+
MemoryTracker::GetInstance().allocate(frag.Vertices().size() *
61+
sizeof(int));
62+
#endif
63+
int bucket_num = frag.GetBucketNum();
64+
int concurrency = thread_num();
65+
if (bucket_num < (concurrency / 2)) {
66+
ForEach(
67+
frag.GetEdges().begin(), frag.GetEdges().end(),
68+
[&](int tid, const typename fragment_t::edge_t& e) {
69+
atomic_add(degree[vertex_t(e.src)], 1);
70+
atomic_add(degree[vertex_t(e.dst)], 1);
71+
},
72+
4096);
73+
} else {
74+
ForEach(0, bucket_num,
75+
[&degree, &frag, bucket_num](int tid, int bucket_id) {
76+
for (int i = 0; i < bucket_num; ++i) {
77+
for (auto& e : frag.GetEdgesOfBucket(i, bucket_id)) {
78+
degree[vertex_t(e.dst)] += 1;
79+
}
80+
}
81+
for (int i = 0; i < bucket_num; ++i) {
82+
for (auto& e : frag.GetEdgesOfBucket(bucket_id, i)) {
83+
degree[vertex_t(e.src)] += 1;
84+
}
85+
}
86+
});
87+
}
88+
#ifdef PROFILING
89+
ctx.t0 += GetCurrentTime();
90+
#endif
91+
92+
messages.GatherMasterVertices<fragment_t, int, NumericSum<int>>(
93+
frag, degree, ctx.master_degree);
94+
#ifdef TRACKING_MEMORY
95+
// deallocate degree array for both src and dst vertices
96+
MemoryTracker::GetInstance().deallocate(frag.Vertices().size() *
97+
sizeof(int));
98+
#endif
99+
}
100+
101+
double p = 1.0 / ctx.graph_vnum;
102+
int64_t dangling_vnum_local = 0;
103+
#ifdef PROFILING
104+
ctx.t1 -= GetCurrentTime();
105+
#endif
106+
std::vector<int64_t> dangling_vnum_local_vec(thread_num(), 0);
107+
ForEach(frag.MasterVertices(), [&](int tid, vertex_t v) {
108+
if (ctx.master_degree[v] == 0) {
109+
++dangling_vnum_local_vec[tid];
110+
ctx.master_result[v] = p;
111+
} else {
112+
ctx.master_result[v] = p / ctx.master_degree[v];
113+
}
114+
});
115+
for (auto x : dangling_vnum_local_vec) {
116+
dangling_vnum_local += x;
117+
}
118+
#ifdef PROFILING
119+
ctx.t1 += GetCurrentTime();
120+
#endif
121+
122+
Sum(dangling_vnum_local, ctx.total_dangling_vnum);
123+
ctx.dangling_sum = p * ctx.total_dangling_vnum;
124+
125+
messages.ScatterMasterVertices<fragment_t, double>(frag, ctx.master_result,
126+
ctx.curr_result);
127+
messages.ForceContinue();
128+
}
129+
130+
void IncEval(const fragment_t& frag, context_t& ctx,
131+
message_manager_t& messages) {
132+
if (ctx.step == 0) {
133+
messages.AllocateGatherBuffers<fragment_t, double>(frag);
134+
}
135+
++ctx.step;
136+
137+
double base = (1.0 - ctx.delta) / ctx.graph_vnum +
138+
ctx.delta * ctx.dangling_sum / ctx.graph_vnum;
139+
ctx.dangling_sum = base * ctx.total_dangling_vnum;
140+
141+
ForEach(frag.Vertices(),
142+
[&ctx](int tid, vertex_t v) { ctx.next_result[v] = 0; });
143+
144+
int bucket_num = frag.GetBucketNum();
145+
int concurrency = thread_num();
146+
147+
#ifdef PROFILING
148+
ctx.t2 -= GetCurrentTime();
149+
#endif
150+
if (bucket_num < (concurrency / 2)) {
151+
ForEach(
152+
frag.GetEdges().begin(), frag.GetEdges().end(),
153+
[&ctx](int tid, const typename fragment_t::edge_t& e) {
154+
atomic_add(ctx.next_result[vertex_t(e.dst)],
155+
ctx.curr_result[vertex_t(e.src)]);
156+
atomic_add(ctx.next_result[vertex_t(e.src)],
157+
ctx.curr_result[vertex_t(e.dst)]);
158+
},
159+
4096);
160+
} else {
161+
ForEach(0, bucket_num, [&ctx, &frag, bucket_num](int tid, int bucket_id) {
162+
for (int i = 0; i < bucket_num; ++i) {
163+
for (auto& e : frag.GetEdgesOfBucket(i, bucket_id)) {
164+
ctx.next_result[vertex_t(e.dst)] +=
165+
ctx.curr_result[vertex_t(e.src)];
166+
}
167+
}
168+
for (int i = 0; i < bucket_num; ++i) {
169+
for (auto& e : frag.GetEdgesOfBucket(bucket_id, i)) {
170+
ctx.next_result[vertex_t(e.src)] +=
171+
ctx.curr_result[vertex_t(e.dst)];
172+
}
173+
}
174+
});
175+
}
176+
177+
#ifdef PROFILING
178+
ctx.t2 += GetCurrentTime();
179+
#endif
180+
181+
messages.GatherMasterVertices<fragment_t, double, NumericSum<double>>(
182+
frag, ctx.next_result, ctx.master_result);
183+
184+
if (ctx.step != ctx.max_round) {
185+
#ifdef PROFILING
186+
ctx.t1 -= GetCurrentTime();
187+
#endif
188+
ForEach(frag.MasterVertices(), [&ctx, base](int tid, vertex_t v) {
189+
if (ctx.master_degree[v] > 0) {
190+
ctx.master_result[v] =
191+
(base + ctx.delta * ctx.master_result[v]) / ctx.master_degree[v];
192+
} else {
193+
ctx.master_result[v] = base;
194+
}
195+
});
196+
#ifdef PROFILING
197+
ctx.t1 += GetCurrentTime();
198+
#endif
199+
200+
messages.ScatterMasterVertices<fragment_t, double>(
201+
frag, ctx.master_result, ctx.curr_result);
202+
messages.ForceContinue();
203+
} else {
204+
#ifdef PROFILING
205+
ctx.t1 -= GetCurrentTime();
206+
#endif
207+
ForEach(frag.MasterVertices(), [&ctx, base](int tid, vertex_t v) {
208+
ctx.master_result[v] = ctx.master_result[v] * ctx.delta + base;
209+
});
210+
#ifdef PROFILING
211+
ctx.t1 += GetCurrentTime();
212+
#endif
213+
}
214+
}
215+
};
216+
217+
} // namespace grape
218+
219+
#endif // EXAMPLES_ANALYTICAL_APPS_PAGERANK_PAGERANK_VC_H_
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/** Copyright 2020 Alibaba Group Holding Limited.
2+
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
*/
15+
16+
#ifndef EXAMPLES_ANALYTICAL_APPS_PAGERANK_PAGERANK_VC_CONTEXT_H_
17+
#define EXAMPLES_ANALYTICAL_APPS_PAGERANK_PAGERANK_VC_CONTEXT_H_
18+
19+
#include "grape/utils/memory_tracker.h"
20+
21+
#include <iomanip>
22+
23+
namespace grape {
24+
25+
template <typename FRAG_T>
26+
class PageRankVCContext : public VertexDataContext<FRAG_T, double> {
27+
using oid_t = typename FRAG_T::oid_t;
28+
29+
public:
30+
explicit PageRankVCContext(const FRAG_T& fragment)
31+
: VertexDataContext<FRAG_T, double>(fragment),
32+
master_result(this->data()) {
33+
curr_result.Init(fragment.Vertices());
34+
next_result.Init(fragment.Vertices());
35+
master_degree.Init(fragment.MasterVertices());
36+
37+
#ifdef TRACKING_MEMORY
38+
MemoryTracker::GetInstance().allocate(fragment.Vertices().size() *
39+
sizeof(double));
40+
MemoryTracker::GetInstance().allocate(fragment.Vertices().size() *
41+
sizeof(double));
42+
MemoryTracker::GetInstance().allocate(fragment.MasterVertices().size() *
43+
sizeof(double));
44+
MemoryTracker::GetInstance().allocate(fragment.MasterVertices().size() *
45+
sizeof(int));
46+
#endif
47+
}
48+
49+
void Init(GatherScatterMessageManager& messages, double delta,
50+
int max_round) {
51+
this->delta = delta;
52+
this->max_round = max_round;
53+
step = 0;
54+
}
55+
56+
void Output(std::ostream& os) {
57+
auto& frag = this->fragment();
58+
auto master_vertices = frag.MasterVertices();
59+
for (auto v : master_vertices) {
60+
os << v.GetValue() << " " << std::scientific << std::setprecision(15)
61+
<< master_result[v] << std::endl;
62+
}
63+
64+
#ifdef PROFILING
65+
VLOG(2) << "[frag-" << frag.fid() << "]: init degree: " << t0 << " s, "
66+
<< "calc master result: " << t1 << " s, "
67+
<< "propogate: " << t2 << " s";
68+
#endif
69+
}
70+
71+
typename FRAG_T::template both_vertex_array_t<double> curr_result;
72+
typename FRAG_T::template both_vertex_array_t<double> next_result;
73+
typename FRAG_T::template vertex_array_t<double>& master_result;
74+
typename FRAG_T::template vertex_array_t<int> master_degree;
75+
76+
int64_t total_dangling_vnum = 0;
77+
int64_t graph_vnum;
78+
int step = 0;
79+
int max_round = 0;
80+
double delta = 0;
81+
82+
double dangling_sum = 0.0;
83+
84+
#ifdef PROFILING
85+
double t0 = 0; // init degree
86+
double t1 = 0; // calc master result
87+
double t2 = 0; // propogate
88+
#endif
89+
};
90+
91+
} // namespace grape
92+
93+
#endif // EXAMPLES_ANALYTICAL_APPS_PAGERANK_PAGERANK_VC_CONTEXT_H_

examples/analytical_apps/run_app.cc

+3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ limitations under the License.
2020
#include <glog/logging.h>
2121

2222
#include "run_app_opt.h"
23+
#include "run_app_vc.h"
2324

2425
int main(int argc, char* argv[]) {
2526
FLAGS_stderrthreshold = 0;
@@ -41,6 +42,8 @@ int main(int argc, char* argv[]) {
4142
std::string name = FLAGS_application;
4243
if (FLAGS_opt) {
4344
grape::RunOpt();
45+
} else if (FLAGS_vc) {
46+
grape::RunVC();
4447
} else {
4548
if (name.find("sssp") != std::string::npos) {
4649
grape::Run<int64_t, uint32_t, grape::EmptyType, double>();

0 commit comments

Comments
 (0)