@@ -12,7 +12,6 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
12
See the License for the specific language governing permissions and
13
13
limitations under the License.
14
14
*/
15
-
16
15
#ifndef EXAMPLES_ANALYTICAL_APPS_PAGERANK_PAGERANK_H_
17
16
#define EXAMPLES_ANALYTICAL_APPS_PAGERANK_PAGERANK_H_
18
17
@@ -23,177 +22,189 @@ limitations under the License.
23
22
namespace grape {
24
23
25
24
/* *
26
- * @brief An implementation of PageRank, the version in LDBC, which can work
27
- * on both directed and undirected graphs.
25
+ * @brief An implementation of PageRank, which can work
26
+ * on undirected graphs.
28
27
*
29
- * This version of PageRank inherits ParallelAppBase. Messages can be sent in
30
- * parallel with the evaluation process. This strategy improves performance by
31
- * overlapping the communication time and the evaluation time.
28
+ * This version of PageRank inherits BatchShuffleAppBase.
29
+ * Messages are generated in batches and received in-place.
32
30
*
33
31
* @tparam FRAG_T
34
32
*/
35
-
36
33
template <typename FRAG_T>
37
- class PageRank : public ParallelAppBase <FRAG_T, PageRankContext<FRAG_T>>,
38
- public Communicator,
39
- public ParallelEngine {
34
+ class PageRank
35
+ : public BatchShuffleAppBase<FRAG_T, PageRankContext<FRAG_T>>,
36
+ public ParallelEngine,
37
+ public Communicator {
40
38
public:
39
+ INSTALL_BATCH_SHUFFLE_WORKER (PageRank<FRAG_T>,
40
+ PageRankContext<FRAG_T>, FRAG_T)
41
+
41
42
using vertex_t = typename FRAG_T::vertex_t ;
43
+ using vid_t = typename FRAG_T::vid_t ;
44
+
45
+ static constexpr bool need_split_edges = true ;
42
46
static constexpr MessageStrategy message_strategy =
43
47
MessageStrategy::kAlongOutgoingEdgeToOuterVertex ;
44
- static constexpr bool need_split_edges = true ;
45
- static constexpr LoadStrategy load_strategy = LoadStrategy::kBothOutIn ;
48
+ static constexpr LoadStrategy load_strategy = LoadStrategy::kOnlyOut ;
46
49
47
- INSTALL_PARALLEL_WORKER ( PageRank<FRAG_T>, PageRankContext<FRAG_T>, FRAG_T)
50
+ PageRank () = default ;
48
51
49
- PageRank () {}
50
52
void PEval (const fragment_t & frag, context_t & ctx,
51
53
message_manager_t & messages) {
52
54
auto inner_vertices = frag.InnerVertices ();
53
55
54
- size_t graph_vnum = frag.GetTotalVerticesNum ();
55
- messages.InitChannels (thread_num ());
56
-
57
56
#ifdef PROFILING
58
57
ctx.exec_time -= GetCurrentTime ();
59
58
#endif
60
59
61
60
ctx.step = 0 ;
62
- double p = 1.0 / graph_vnum;
63
-
64
- // assign initial ranks
65
- ForEach (inner_vertices, [&ctx, &frag, p, &messages](int tid, vertex_t u) {
66
- int EdgeNum = frag.GetOutgoingAdjList (u).Size ();
67
- ctx.degree [u] = EdgeNum;
68
- if (EdgeNum > 0 ) {
69
- ctx.result [u] = p / EdgeNum;
70
- messages.SendMsgThroughOEdges <fragment_t , double >(frag, u,
71
- ctx.result [u], tid);
72
- } else {
73
- ctx.result [u] = p;
74
- }
75
- });
61
+ ctx.graph_vnum = frag.GetTotalVerticesNum ();
62
+ vid_t dangling_vnum = 0 ;
63
+ double p = 1.0 / ctx.graph_vnum ;
64
+
65
+ std::vector<vid_t > dangling_vnum_tid (thread_num (), 0 );
66
+ ForEach (inner_vertices,
67
+ [&ctx, &frag, p, &dangling_vnum_tid](int tid, vertex_t u) {
68
+ int EdgeNum = frag.GetLocalOutDegree (u);
69
+ ctx.degree [u] = EdgeNum;
70
+ if (EdgeNum > 0 ) {
71
+ ctx.result [u] = p / EdgeNum;
72
+ } else {
73
+ ++dangling_vnum_tid[tid];
74
+ ctx.result [u] = p;
75
+ }
76
+ ctx.result [u] = EdgeNum > 0 ? p / EdgeNum : p;
77
+ });
78
+
79
+ for (auto vn : dangling_vnum_tid) {
80
+ dangling_vnum += vn;
81
+ }
82
+
83
+ Sum (dangling_vnum, ctx.total_dangling_vnum );
84
+ ctx.dangling_sum = p * ctx.total_dangling_vnum ;
76
85
77
86
#ifdef PROFILING
78
87
ctx.exec_time += GetCurrentTime ();
79
88
ctx.postprocess_time -= GetCurrentTime ();
80
89
#endif
81
90
82
- for (auto u : inner_vertices) {
83
- if (ctx.degree [u] == 0 ) {
84
- ++ctx.dangling_vnum ;
85
- }
86
- }
87
-
88
- double dangling_sum = p * static_cast <double >(ctx.dangling_vnum );
89
-
90
- Sum (dangling_sum, ctx.dangling_sum );
91
-
91
+ messages.SyncInnerVertices <fragment_t , double >(frag, ctx.result ,
92
+ thread_num ());
92
93
#ifdef PROFILING
93
94
ctx.postprocess_time += GetCurrentTime ();
94
95
#endif
95
- messages.ForceContinue ();
96
96
}
97
97
98
98
void IncEval (const fragment_t & frag, context_t & ctx,
99
99
message_manager_t & messages) {
100
100
auto inner_vertices = frag.InnerVertices ();
101
-
102
- double dangling_sum = ctx.dangling_sum ;
103
-
104
- size_t graph_vnum = frag.GetTotalVerticesNum ();
105
-
106
101
++ctx.step ;
107
- if (ctx.step > ctx.max_round ) {
108
- return ;
109
- }
102
+
103
+ double base = (1.0 - ctx.delta ) / ctx.graph_vnum +
104
+ ctx.delta * ctx.dangling_sum / ctx.graph_vnum ;
105
+ ctx.dangling_sum = base * ctx.total_dangling_vnum ;
110
106
111
107
#ifdef PROFILING
112
108
ctx.exec_time -= GetCurrentTime ();
113
109
#endif
114
110
115
- double base =
116
- (1.0 - ctx.delta ) / graph_vnum + ctx.delta * dangling_sum / graph_vnum;
117
-
118
- // pull ranks from neighbors
119
- ForEach (inner_vertices, [&ctx, base, &frag](int tid, vertex_t u) {
120
- if (ctx.degree [u] == 0 ) {
121
- ctx.next_result [u] = base;
122
- } else {
111
+ if (ctx.avg_degree > 10 && frag.fnum () > 1 ) {
112
+ // If fragment is dense and there are multiple fragments, receiving
113
+ // messages is overlapped with computation. Receiving and computing
114
+ // procedures are be splitted into multiple rounds. In each round,
115
+ // messages from a fragment are received and then processed.
116
+ ForEach (inner_vertices, [&ctx, &frag](int tid, vertex_t u) {
123
117
double cur = 0 ;
124
- auto es = frag.GetIncomingInnerVertexAdjList (u);
118
+ auto es = frag.GetOutgoingInnerVertexAdjList (u);
125
119
for (auto & e : es) {
126
120
cur += ctx.result [e.neighbor ];
127
121
}
128
122
ctx.next_result [u] = cur;
129
- }
130
- });
123
+ });
131
124
125
+ for (fid_t i = 2 ; i < frag.fnum (); ++i) {
132
126
#ifdef PROFILING
133
- ctx.exec_time += GetCurrentTime ();
134
- ctx.preprocess_time -= GetCurrentTime ();
127
+ ctx.preprocess_time -= GetCurrentTime ();
135
128
#endif
136
-
137
- // process received ranks sent by other workers
138
- {
139
- messages.ParallelProcess <fragment_t , double >(
140
- thread_num (), frag, [&ctx](int tid, vertex_t u, const double & msg) {
141
- ctx.result [u] = msg;
142
- });
143
- }
144
-
129
+ fid_t src_fid = messages.UpdatePartialOuterVertices ();
145
130
#ifdef PROFILING
146
- ctx.preprocess_time += GetCurrentTime ();
147
- ctx.exec_time -= GetCurrentTime ();
131
+ ctx.preprocess_time += GetCurrentTime ();
132
+ ctx.exec_time -= GetCurrentTime ();
148
133
#endif
149
-
150
- // compute new ranks and send messages
151
- if (ctx.step != ctx.max_round ) {
152
- ForEach (inner_vertices,
153
- [&ctx, base, &frag, &messages](int tid, vertex_t u) {
154
- if (ctx.degree [u] != 0 ) {
155
- double cur = ctx.next_result [u];
156
- auto es = frag.GetIncomingOuterVertexAdjList (u);
157
- for (auto & e : es) {
158
- cur += ctx.result [e.neighbor ];
159
- }
160
- cur = (ctx.delta * cur + base) / ctx.degree [u];
161
- ctx.next_result [u] = cur;
162
- messages.SendMsgThroughOEdges <fragment_t , double >(
163
- frag, u, ctx.next_result [u], tid);
164
- }
165
- });
166
- } else {
167
- ForEach (inner_vertices, [&ctx, base, &frag](int tid, vertex_t u) {
168
- if (ctx.degree [u] != 0 ) {
134
+ ForEach (inner_vertices, [src_fid, &frag, &ctx](int tid, vertex_t u) {
169
135
double cur = ctx.next_result [u];
170
- auto es = frag.GetIncomingOuterVertexAdjList (u );
136
+ auto es = frag.GetOutgoingAdjList (u, src_fid );
171
137
for (auto & e : es) {
172
138
cur += ctx.result [e.neighbor ];
173
139
}
174
- cur = (ctx.delta * cur + base) / ctx.degree [u];
175
140
ctx.next_result [u] = cur;
141
+ });
142
+ #ifdef PROFILING
143
+ ctx.exec_time += GetCurrentTime ();
144
+ #endif
145
+ }
146
+
147
+ #ifdef PROFILING
148
+ ctx.preprocess_time -= GetCurrentTime ();
149
+ #endif
150
+ fid_t src_fid = messages.UpdatePartialOuterVertices ();
151
+ #ifdef PROFILING
152
+ ctx.preprocess_time += GetCurrentTime ();
153
+ ctx.exec_time -= GetCurrentTime ();
154
+ #endif
155
+ ForEach (
156
+ inner_vertices, [src_fid, &frag, &ctx, base](int tid, vertex_t u) {
157
+ double cur = ctx.next_result [u];
158
+ auto es = frag.GetOutgoingAdjList (u, src_fid);
159
+ for (auto & e : es) {
160
+ cur += ctx.result [e.neighbor ];
161
+ }
162
+ int en = frag.GetLocalOutDegree (u);
163
+ ctx.next_result [u] = en > 0 ? (ctx.delta * cur + base) / en : base;
164
+ });
165
+ #ifdef PROFILING
166
+ ctx.exec_time += GetCurrentTime ();
167
+ #endif
168
+ } else {
169
+ // If the fragment is sparse or there is only one fragment, one round of
170
+ // iterating inner vertices is prefered.
171
+ #ifdef PROFILING
172
+ ctx.preprocess_time -= GetCurrentTime ();
173
+ #endif
174
+ messages.UpdateOuterVertices ();
175
+ #ifdef PROFILING
176
+ ctx.preprocess_time += GetCurrentTime ();
177
+ ctx.exec_time -= GetCurrentTime ();
178
+ #endif
179
+ ForEach (inner_vertices, [&ctx, &frag, base](int tid, vertex_t u) {
180
+ double cur = 0 ;
181
+ auto es = frag.GetOutgoingAdjList (u);
182
+ for (auto & e : es) {
183
+ cur += ctx.result [e.neighbor ];
176
184
}
185
+ int en = frag.GetLocalOutDegree (u);
186
+ ctx.next_result [u] = en > 0 ? (ctx.delta * cur + base) / en : base;
177
187
});
188
+ #ifdef PROFILING
189
+ ctx.exec_time += GetCurrentTime ();
190
+ #endif
178
191
}
179
192
180
193
#ifdef PROFILING
181
- ctx.exec_time += GetCurrentTime ();
182
194
ctx.postprocess_time -= GetCurrentTime ();
183
195
#endif
196
+ if (ctx.step != ctx.max_round ) {
197
+ messages.SyncInnerVertices <fragment_t , double >(frag, ctx.next_result ,
198
+ thread_num ());
199
+ }
184
200
185
201
ctx.result .Swap (ctx.next_result );
186
-
187
- double new_dangling = base * static_cast <double >(ctx.dangling_vnum );
188
-
189
- Sum (new_dangling, ctx.dangling_sum );
190
-
191
202
#ifdef PROFILING
192
203
ctx.postprocess_time += GetCurrentTime ();
193
204
#endif
194
- messages.ForceContinue ();
195
205
}
196
206
};
197
207
198
208
} // namespace grape
209
+
199
210
#endif // EXAMPLES_ANALYTICAL_APPS_PAGERANK_PAGERANK_H_
0 commit comments