-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPageRankReducer.java
59 lines (39 loc) · 1.63 KB
/
PageRankReducer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package bigdata;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
public class PageRankReducer extends Reducer<Text, ObjectWritable, Text, Text> {
private long totalNodes;
@Override
public void setup(Context context) throws IOException, InterruptedException{
Configuration conf = context.getConfiguration();
Cluster cluster = new Cluster(conf);
Job currentJob = cluster.getJob(context.getJobID());
totalNodes = currentJob.getCounters().findCounter(PageRankDriver.Counter.totalNodes).getValue();
}
@Override
protected void reduce(Text key, Iterable<ObjectWritable> values, Context context)
throws IOException, InterruptedException {
PageRank p= null;
double pageRankValue=0;
for (ObjectWritable object: values)
{
if ( object.get() instanceof PageRank )
{
p= (PageRank) object.get(); //get the graph
}
else //otherwise the class is DoubleWritable
{
pageRankValue+= PageRankDriver.BETA*( (DoubleWritable) object.get()).get(); //BETA=0.8
}
}
pageRankValue+= ( (1- context.getConfiguration().getDouble("sumOfPageRanks",-1))/totalNodes); //sumOfPageRanks is S which comes from the S_Calculation_Reducer (the other mapreduce job)
p.setPageRankValue(pageRankValue);
context.write(new Text(p.getNode()), new Text(p.toString()));
}
}//end of Reduce class