1
- /* * Copyright 2022 Alibaba Group Holding Limited.
1
+ /* * Copyright 2023 Alibaba Group Holding Limited.
2
2
3
3
Licensed under the Apache License, Version 2.0 (the "License");
4
4
you may not use this file except in compliance with the License.
@@ -32,13 +32,6 @@ class BFSContext : public grape::VoidContext<FRAG_T> {
32
32
33
33
explicit BFSContext (const FRAG_T& frag) : grape::VoidContext<FRAG_T>(frag) {}
34
34
35
- #ifdef PROFILING
36
- ~BFSContext () {
37
- LOG (INFO) << " Get msg time: " << get_msg_time * 1000 ;
38
- LOG (INFO) << " BFS kernel time: " << traversal_kernel_time * 1000 ;
39
- }
40
- #endif
41
-
42
35
void Init (GPUMessageManager& messages, AppConfig app_config, oid_t src_id) {
43
36
auto & frag = this ->fragment ();
44
37
auto vertices = frag.Vertices ();
@@ -50,7 +43,9 @@ class BFSContext : public grape::VoidContext<FRAG_T> {
50
43
depth.Init (vertices, std::numeric_limits<depth_t >::max ());
51
44
depth.H2D ();
52
45
in_q.Init (iv.size ());
53
- out_q_local.Init (iv.size ());
46
+ current_active_map.Init (iv);
47
+ next_active_map.Init (iv);
48
+ visited.Init (iv);
54
49
55
50
messages.InitBuffer ((sizeof (depth_t ) + sizeof (vid_t )) * ov.size (),
56
51
(sizeof (depth_t ) + sizeof (vid_t )) * iv.size ());
@@ -68,14 +63,13 @@ class BFSContext : public grape::VoidContext<FRAG_T> {
68
63
}
69
64
70
65
oid_t src_id{};
66
+ double active_ratio;
71
67
LoadBalancing lb{};
72
68
depth_t curr_depth{};
73
69
VertexArray<depth_t , vid_t > depth;
74
- Queue<vertex_t , vid_t > in_q, out_q_local;
75
- #ifdef PROFILING
76
- double get_msg_time{};
77
- double traversal_kernel_time{};
78
- #endif
70
+ Queue<vertex_t , vid_t > in_q;
71
+ DenseVertexSet<vid_t > current_active_map, next_active_map;
72
+ DenseVertexSet<vid_t > visited;
79
73
};
80
74
81
75
template <typename FRAG_T>
@@ -89,6 +83,7 @@ class BFS : public GPUAppBase<FRAG_T, BFSContext<FRAG_T>>,
89
83
using edata_t = typename fragment_t ::edata_t ;
90
84
using vertex_t = typename dev_fragment_t ::vertex_t ;
91
85
using nbr_t = typename dev_fragment_t ::nbr_t ;
86
+ static constexpr bool need_split_edges = true ;
92
87
93
88
void PEval (const fragment_t & frag, context_t & ctx,
94
89
message_manager_t & messages) {
@@ -101,88 +96,154 @@ class BFS : public GPUAppBase<FRAG_T, BFSContext<FRAG_T>>,
101
96
messages.stream (),
102
97
[=] __device__ (dev_fragment_t d_frag,
103
98
dev::VertexArray<depth_t , vid_t > depth,
104
- dev::Queue<vertex_t , vid_t > in_q) {
99
+ dev::DenseVertexSet<vid_t > d_current_active_map,
100
+ dev::DenseVertexSet<vid_t > d_visited) {
105
101
auto tid = TID_1D;
106
102
107
103
if (tid == 0 ) {
108
104
depth[source] = 0 ;
109
- in_q.Append (source);
105
+ d_current_active_map.Insert (source);
106
+ d_visited.Insert (source);
110
107
}
111
108
},
112
109
frag.DeviceObject (), ctx.depth .DeviceObject (),
113
- ctx.in_q .DeviceObject ());
110
+ ctx.current_active_map . DeviceObject (), ctx. visited .DeviceObject ());
114
111
}
115
112
messages.ForceContinue ();
116
113
}
117
114
118
115
void IncEval (const fragment_t & frag, context_t & ctx,
119
116
message_manager_t & messages) {
120
117
auto d_frag = frag.DeviceObject ();
118
+ auto iv = frag.InnerVertices ();
119
+ auto ov = frag.OuterVertices ();
121
120
auto d_depth = ctx.depth .DeviceObject ();
122
121
auto & in_q = ctx.in_q ;
123
122
auto d_in_q = in_q.DeviceObject ();
124
- auto & out_q_local = ctx.out_q_local ;
125
- auto d_out_q_local = out_q_local.DeviceObject ();
123
+ auto & current_active_map = ctx.current_active_map ;
124
+ auto d_current_active_map = current_active_map.DeviceObject ();
125
+ auto & visited = ctx.visited ;
126
+ auto d_visited = visited.DeviceObject ();
127
+ auto & next_active_map = ctx.next_active_map ;
128
+ auto d_next_active_map = next_active_map.DeviceObject ();
126
129
auto curr_depth = ctx.curr_depth ;
127
130
auto next_depth = curr_depth + 1 ;
128
131
auto & stream = messages.stream ();
129
132
auto d_mm = messages.DeviceObject ();
133
+ bool isDirected = frag.load_strategy == grape::LoadStrategy::kBothOutIn ;
134
+
135
+ next_active_map.Clear (stream);
136
+ in_q.Clear (stream);
130
137
131
- #ifdef PROFILING
132
- ctx.get_msg_time -= grape::GetCurrentTime ();
133
- auto process_msg_time = grape::GetCurrentTime ();
134
- #endif
135
138
messages.template ParallelProcess <dev_fragment_t , grape::EmptyType>(
136
139
d_frag, [=] __device__ (vertex_t v) mutable {
137
140
assert (d_frag.IsInnerVertex (v));
138
141
139
142
if (curr_depth < d_depth[v]) {
140
143
d_depth[v] = curr_depth;
141
- d_in_q.AppendWarp (v);
144
+ d_current_active_map.Insert (v);
145
+ d_visited.Insert (v);
142
146
}
143
147
});
144
- auto in_size = in_q.size (stream);
145
-
146
- WorkSourceArray<vertex_t > ws_in (in_q.data (), in_size);
147
148
148
- #ifdef PROFILING
149
- VLOG (1 ) << " Frag " << frag.fid () << " In: " << in_size;
150
- process_msg_time = grape::GetCurrentTime () - process_msg_time;
151
- ctx.get_msg_time += grape::GetCurrentTime ();
152
- auto traversal_kernel_time = grape::GetCurrentTime ();
153
- #endif
154
-
155
- ForEachOutgoingEdge (
156
- stream, d_frag, ws_in,
157
- [=] __device__ (const vertex_t & u, const nbr_t & nbr) mutable {
158
- vertex_t v = nbr.get_neighbor ();
159
-
160
- if (next_depth < d_depth[v]) {
161
- d_depth[v] = next_depth;
162
-
163
- if (d_frag.IsInnerVertex (v)) {
164
- d_out_q_local.AppendWarp (v);
165
- } else {
149
+ auto ivnum = iv.size ();
150
+ auto active = current_active_map.Count (stream);
151
+ auto visited_num = visited.Count (stream);
152
+ double active_ratio = (active + 0.0 ) / ivnum;
153
+ double visited_ratio = (visited_num + 0.0 ) / ivnum;
154
+ bool usePush = (2.5 * active_ratio < (1 - visited_ratio)) || (active == 0 );
155
+ if (usePush) {
156
+ // push-based search
157
+ WorkSourceRange<vertex_t > ws_iv (*iv.begin (), iv.size ());
158
+ ForEach (stream, ws_iv, [=] __device__ (vertex_t v) mutable {
159
+ if (d_current_active_map.Exist (v)) {
160
+ d_in_q.AppendWarp (v);
161
+ }
162
+ });
163
+ WorkSourceArray<vertex_t > ws_in (in_q.data (), in_q.size (stream));
164
+
165
+ ForEachOutgoingEdge (
166
+ stream, d_frag, ws_in,
167
+ [=] __device__ (const vertex_t & u, const nbr_t & nbr) mutable {
168
+ vertex_t v = nbr.get_neighbor ();
169
+
170
+ if (next_depth < d_depth[v]) {
171
+ d_depth[v] = next_depth;
172
+ if (d_frag.IsInnerVertex (v)) {
173
+ d_next_active_map.Insert (v);
174
+ d_visited.Insert (v);
175
+ } else {
176
+ d_mm.SyncStateOnOuterVertex (d_frag, v);
177
+ }
178
+ }
179
+ },
180
+ ctx.lb );
181
+ } else {
182
+ // pull-based search
183
+ WorkSourceRange<vertex_t > ws_ov (*ov.begin (), ov.size ());
184
+ depth_t MAX_DEPTH = std::numeric_limits<depth_t >::max ();
185
+ ForEach (stream, ws_ov, [=] __device__ (vertex_t v) mutable {
186
+ if (d_depth[v] == MAX_DEPTH) {
187
+ auto ies = d_frag.GetIncomingAdjList (v);
188
+ for (auto & e : ies) {
189
+ auto u = e.get_neighbor ();
190
+ assert (d_frag.IsInnerVertex (u));
191
+ if (d_current_active_map.Exist (u)) {
192
+ d_depth[v] = next_depth;
166
193
d_mm.SyncStateOnOuterVertex (d_frag, v);
194
+ break ;
167
195
}
168
196
}
169
- },
170
- ctx.lb );
197
+ }
198
+ });
199
+
200
+ WorkSourceRange<vertex_t > ws_iv (*iv.begin (), iv.size ());
201
+ ForEach (stream, ws_iv, [=] __device__ (vertex_t v) mutable {
202
+ if (!d_visited.Exist (v)) {
203
+ d_in_q.AppendWarp (v);
204
+ }
205
+ });
206
+ WorkSourceArray<vertex_t > ws_in (in_q.data (), in_q.size (stream));
207
+
208
+ if (isDirected) {
209
+ ForEach (stream, ws_in, [=] __device__ (vertex_t v) mutable {
210
+ auto ies = d_frag.GetIncomingInnerVertexAdjList (v);
211
+ for (auto & e : ies) {
212
+ auto u = e.get_neighbor ();
213
+ if (d_current_active_map.Exist (u)) {
214
+ d_depth[v] = next_depth;
215
+ d_next_active_map.Insert (v);
216
+ d_visited.Insert (v);
217
+ break ;
218
+ }
219
+ }
220
+ });
221
+ } else {
222
+ ForEach (stream, ws_in, [=] __device__ (vertex_t v) mutable {
223
+ auto oes = d_frag.GetOutgoingInnerVertexAdjList (v);
224
+ for (auto & e : oes) {
225
+ auto u = e.get_neighbor ();
226
+ assert (d_frag.IsInnerVertex (u));
227
+ if (d_current_active_map.Exist (u)) {
228
+ d_depth[v] = next_depth;
229
+ d_next_active_map.Insert (v);
230
+ d_visited.Insert (v);
231
+ break ;
232
+ }
233
+ }
234
+ });
235
+ }
236
+ }
237
+
238
+ auto has_work = next_active_map.Count (stream);
171
239
stream.Sync ();
172
- auto local_out_size = out_q_local.size (stream);
173
- #ifdef PROFILING
174
- traversal_kernel_time = grape::GetCurrentTime () - traversal_kernel_time;
175
- VLOG (2 ) << " Frag " << frag.fid () << " Local out: " << local_out_size
176
- << " ProcessMsg time: " << process_msg_time * 1000
177
- << " Kernel time: " << traversal_kernel_time * 1000 ;
178
- ctx.traversal_kernel_time += traversal_kernel_time;
179
- #endif
180
- in_q.Clear (stream);
181
- out_q_local.Swap (in_q);
182
- ctx.curr_depth = next_depth;
183
- if (local_out_size > 0 ) {
240
+
241
+ if (has_work > 0 ) {
184
242
messages.ForceContinue ();
185
243
}
244
+
245
+ ctx.curr_depth = next_depth;
246
+ current_active_map.Swap (next_active_map);
186
247
}
187
248
};
188
249
} // namespace cuda
0 commit comments