This repository has been archived by the owner on Mar 23, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 83
/
Copy pathTweetCount.java
101 lines (85 loc) · 3.54 KB
/
TweetCount.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package com.miguno.avro.hadoop;
import com.miguno.avro.Tweet;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.mapred.AvroJob;
import org.apache.avro.mapred.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
import java.io.IOException;
/**
* This MapReduce job counts the number of tweets created by Twitter users.
* <p/>
* It uses AvroMapper and AvroReducer to implement the Map and Reduce steps, respectively. The output will be stored in
* Snappy-compressed Avro format.
* <p/>
* When you run this class you must supplied it with two parameters: first, the path to the input data; second, the path
* where to store the output data.
*/
public class TweetCount extends Configured implements Tool {
private static final Logger LOG = Logger.getLogger(TweetCount.class);
private static final String SNAPPY_CODEC = "snappy";
@Override
public int run(String[] args) throws Exception {
if (!hasValidCommandLineParameters(args)) {
return ExitCode.ERROR_ILLEGAL_CLI_ARGUMENTS.intValue();
}
Path inputPath = extractInputPathFromArgs(args);
Path outputPath = extractOutputPathFromArgs(args);
JobConf conf = createJobConfiguration(inputPath, outputPath);
RunningJob job = runAndWaitForJobToFinish(conf);
ExitCode exitCode = deriveExitCodeBasedOnJobSuccess(job);
return exitCode.intValue();
}
private boolean hasValidCommandLineParameters(String[] args) {
if (args.length != 2) {
LOG.error(String.format("Usage: %s <input path> <output path>", getClass().getSimpleName()));
return false;
}
else {
return true;
}
}
private Path extractInputPathFromArgs(String[] args) {
return new Path(args[0]);
}
private Path extractOutputPathFromArgs(String[] args) {
return new Path(args[1]);
}
private JobConf createJobConfiguration(Path inputPath, Path outputPath) {
JobConf conf = new JobConf(TweetCount.class);
conf.setJobName("tweetcount");
// AvroJob#setInputSchema() and AvroJob#setOutputSchema() set relevant config options such as input/output
// format, map output classes, and output key class.
AvroJob.setInputSchema(conf, Tweet.getClassSchema());
AvroJob.setOutputSchema(conf, Pair.getPairSchema(Schema.create(Type.STRING), Schema.create(Type.INT)));
AvroJob.setMapperClass(conf, TweetCountMapper.class);
AvroJob.setReducerClass(conf, TweetCountReducer.class);
AvroJob.setOutputCodec(conf, SNAPPY_CODEC);
FileInputFormat.setInputPaths(conf, inputPath);
FileOutputFormat.setOutputPath(conf, outputPath);
return conf;
}
private RunningJob runAndWaitForJobToFinish(JobConf conf) throws IOException {
RunningJob job = JobClient.runJob(conf);
job.waitForCompletion();
return job;
}
private ExitCode deriveExitCodeBasedOnJobSuccess(RunningJob job) throws IOException {
if (job.isSuccessful()) {
return ExitCode.SUCCESS;
}
else {
return ExitCode.ERROR_JOB_FAILED;
}
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(), new TweetCount(), args);
System.exit(exitCode);
}
}