1
+ /* * Copyright 2020 Alibaba Group Holding Limited.
2
+
3
+ Licensed under the Apache License, Version 2.0 (the "License");
4
+ you may not use this file except in compliance with the License.
5
+ You may obtain a copy of the License at
6
+
7
+ http://www.apache.org/licenses/LICENSE-2.0
8
+
9
+ Unless required by applicable law or agreed to in writing, software
10
+ distributed under the License is distributed on an "AS IS" BASIS,
11
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
+ See the License for the specific language governing permissions and
13
+ limitations under the License.
14
+ */
15
+
16
+ #ifndef EXAMPLES_ANALYTICAL_APPS_BC_BC_H_
17
+ #define EXAMPLES_ANALYTICAL_APPS_BC_BC_H_
18
+
19
+ #include < grape/grape.h>
20
+
21
+ #include " bc/bc_context.h"
22
+
23
+ namespace grape {
24
+
25
+ /* *
26
+ * @brief An implementation of BC(betweeness centrality), which can work
27
+ * on undirected graph.
28
+ *
29
+ * In this version Breadth-First Search (BFS) and centrality calculations will
30
+ * be executed sequentially. During the first phase, global synchronization is
31
+ * required in each round to determine whether the BFS has converged. When the
32
+ * number of rounds is substantial, the overhead introduced by synchronization
33
+ * becomes notably significant.
34
+ *
35
+ * @tparam FRAG_T
36
+ */
37
+ template <typename FRAG_T>
38
+ class BC : public ParallelAppBase <FRAG_T, BCContext<FRAG_T>,
39
+ ParallelMessageManagerOpt>,
40
+ public ParallelEngine,
41
+ public Communicator {
42
+ INSTALL_PARALLEL_OPT_WORKER (BC<FRAG_T>, BCContext<FRAG_T>, FRAG_T)
43
+ using vertex_t = typename fragment_t ::vertex_t ;
44
+
45
+ void PEval (const fragment_t & frag, context_t & ctx,
46
+ message_manager_t & messages) {
47
+ using depth_type = typename context_t ::depth_type;
48
+
49
+ messages.InitChannels (thread_num ());
50
+
51
+ ctx.current_depth = 1 ;
52
+
53
+ vertex_t source;
54
+ bool native_source = frag.GetInnerVertex (ctx.source_id , source);
55
+
56
+ auto inner_vertices = frag.InnerVertices ();
57
+ auto outer_vertices = frag.OuterVertices ();
58
+
59
+ // init double buffer which contains updated vertices using bitmap
60
+ ctx.curr_inner_updated .Init (inner_vertices, GetThreadPool ());
61
+ ctx.next_inner_updated .Init (inner_vertices, GetThreadPool ());
62
+ ctx.outer_updated .Init (outer_vertices, GetThreadPool ());
63
+
64
+ ctx.path_num .Init (frag.Vertices (), 0 );
65
+ ctx.partial_result .Init (frag.Vertices (),
66
+ std::numeric_limits<depth_type>::max ());
67
+
68
+ auto & channel_0 = messages.Channels ()[0 ];
69
+
70
+ // run first round BFS, update unreached vertices
71
+ if (native_source) {
72
+ ctx.partial_result [source] = 0 ;
73
+ ctx.path_num [source] = 1 ;
74
+ auto oes = frag.GetOutgoingAdjList (source);
75
+ for (auto & e : oes) {
76
+ auto u = e.get_neighbor ();
77
+ if (ctx.partial_result [u] == std::numeric_limits<depth_type>::max ()) {
78
+ ctx.partial_result [u] = 1 ;
79
+ ctx.path_num [u] = 1 ;
80
+ if (frag.IsOuterVertex (u)) {
81
+ channel_0.template SyncStateOnOuterVertex <fragment_t , double >(frag,
82
+ u, 1 );
83
+ } else {
84
+ ctx.curr_inner_updated .Insert (u);
85
+ }
86
+ }
87
+ }
88
+ }
89
+
90
+ messages.ForceContinue ();
91
+ ctx.stage = 0 ;
92
+ }
93
+
94
+ void IncEval (const fragment_t & frag, context_t & ctx,
95
+ message_manager_t & messages) {
96
+ using depth_type = typename context_t ::depth_type;
97
+
98
+ auto & channels = messages.Channels ();
99
+
100
+ if (ctx.stage == 0 ) {
101
+ depth_type next_depth = ctx.current_depth + 1 ;
102
+ ctx.next_inner_updated .ParallelClear (GetThreadPool ());
103
+
104
+ // process received messages and update depth
105
+ messages.ParallelProcess <fragment_t , double >(
106
+ thread_num (), frag, [&ctx](int tid, vertex_t v, double pn) {
107
+ if (ctx.partial_result [v] ==
108
+ std::numeric_limits<depth_type>::max ()) {
109
+ ctx.partial_result [v] = ctx.current_depth ;
110
+ atomic_add (ctx.path_num [v], pn);
111
+ ctx.curr_inner_updated .Insert (v);
112
+ } else if (ctx.partial_result [v] == ctx.current_depth ) {
113
+ atomic_add (ctx.path_num [v], pn);
114
+ }
115
+ });
116
+
117
+ // sync messages to other workers
118
+ ForEach (ctx.curr_inner_updated , [next_depth, &frag, &ctx, &channels](
119
+ int tid, vertex_t v) {
120
+ auto oes = frag.GetOutgoingAdjList (v);
121
+ double pn = ctx.path_num [v];
122
+ for (auto & e : oes) {
123
+ auto u = e.get_neighbor ();
124
+ if (ctx.partial_result [u] == std::numeric_limits<depth_type>::max ()) {
125
+ atomic_add (ctx.path_num [u], pn);
126
+ ctx.partial_result [u] = next_depth;
127
+ if (frag.IsOuterVertex (u)) {
128
+ ctx.outer_updated .Insert (u);
129
+ } else {
130
+ ctx.next_inner_updated .Insert (u);
131
+ }
132
+ } else if (ctx.partial_result [u] == next_depth) {
133
+ atomic_add (ctx.path_num [u], pn);
134
+ }
135
+ }
136
+ });
137
+
138
+ int status = 0 ;
139
+ if (!ctx.outer_updated .Empty ()) {
140
+ status = 1 ;
141
+
142
+ ForEach (
143
+ ctx.outer_updated , [&frag, &ctx, &channels](int tid, vertex_t v) {
144
+ channels[tid].template SyncStateOnOuterVertex <fragment_t , double >(
145
+ frag, v, ctx.path_num [v]);
146
+ });
147
+ ctx.outer_updated .ParallelClear (GetThreadPool ());
148
+ } else if (!ctx.next_inner_updated .Empty ()) {
149
+ status = 1 ;
150
+ }
151
+ ctx.current_depth = next_depth;
152
+ ctx.next_inner_updated .Swap (ctx.curr_inner_updated );
153
+
154
+ int global_status = 0 ;
155
+ Sum<int >(status, global_status);
156
+
157
+ if (global_status == 0 ) {
158
+ depth_type curr_depth = ctx.current_depth - ctx.stage ;
159
+ auto inner_vertices = frag.InnerVertices ();
160
+
161
+ ForEach (inner_vertices, [&ctx, curr_depth, &frag](int tid, vertex_t v) {
162
+ if (ctx.partial_result [v] == curr_depth) {
163
+ float accum = static_cast <float >(1 ) / ctx.path_num [v];
164
+ auto es = frag.GetOutgoingAdjList (v);
165
+ for (auto & e : es) {
166
+ auto u = e.get_neighbor ();
167
+ if (frag.IsInnerVertex (u)) {
168
+ if (ctx.partial_result [u] == curr_depth - 1 ) {
169
+ atomic_add (ctx.centrality_value [u], accum);
170
+ }
171
+ } else {
172
+ atomic_add (ctx.centrality_value [u], accum);
173
+ ctx.outer_updated .Insert (u);
174
+ }
175
+ }
176
+ }
177
+ });
178
+
179
+ ForEach (ctx.outer_updated ,
180
+ [&frag, &ctx, &channels](int tid, vertex_t v) {
181
+ channels[tid].SyncStateOnOuterVertex <fragment_t , float >(
182
+ frag, v, ctx.centrality_value [v]);
183
+ ctx.centrality_value [v] = 0 ;
184
+ });
185
+ ctx.outer_updated .ParallelClear (GetThreadPool ());
186
+ ctx.stage = 1 ;
187
+ }
188
+
189
+ messages.ForceContinue ();
190
+ } else {
191
+ depth_type curr_depth = ctx.current_depth - ctx.stage ;
192
+
193
+ messages.ParallelProcess <fragment_t , float >(
194
+ thread_num (), frag,
195
+ [&ctx, curr_depth](int tid, vertex_t v, float bc) {
196
+ if (ctx.partial_result [v] == curr_depth) {
197
+ atomic_add (ctx.centrality_value [v], bc);
198
+ }
199
+ });
200
+ if (curr_depth > 0 ) {
201
+ auto inner_vertices = frag.InnerVertices ();
202
+
203
+ ForEach (inner_vertices, [&ctx, curr_depth, &frag](int tid, vertex_t v) {
204
+ if (ctx.partial_result [v] == curr_depth) {
205
+ ctx.centrality_value [v] *= ctx.path_num [v];
206
+ float accum = static_cast <float >(1 ) / ctx.path_num [v] *
207
+ (1.0 + ctx.centrality_value [v]);
208
+ auto es = frag.GetOutgoingAdjList (v);
209
+ for (auto & e : es) {
210
+ auto u = e.get_neighbor ();
211
+ if (frag.IsInnerVertex (u)) {
212
+ if (ctx.partial_result [u] == curr_depth - 1 ) {
213
+ atomic_add (ctx.centrality_value [u], accum);
214
+ }
215
+ } else {
216
+ atomic_add (ctx.centrality_value [u], accum);
217
+ ctx.outer_updated .Insert (u);
218
+ }
219
+ }
220
+ }
221
+ });
222
+
223
+ ForEach (ctx.outer_updated ,
224
+ [&frag, &ctx, &channels](int tid, vertex_t v) {
225
+ channels[tid].SyncStateOnOuterVertex <fragment_t , float >(
226
+ frag, v, ctx.centrality_value [v]);
227
+ ctx.centrality_value [v] = 0 ;
228
+ });
229
+ ctx.outer_updated .ParallelClear (GetThreadPool ());
230
+ messages.ForceContinue ();
231
+ }
232
+
233
+ ++ctx.stage ;
234
+ }
235
+ }
236
+ };
237
+
238
+ } // namespace grape
239
+
240
+ #endif // EXAMPLES_ANALYTICAL_APPS_BC_BC_H_
0 commit comments