Skip to content

Commit 1af3093

Browse files
author
Matt Hayes
committed
Fix to correctly load schema for input paths
The map from input path to schema was not set up correctly in the record reader, so the schema wasn't be loaded as a result and instead was null. This corrects the problem and also adds verification that the schema can be determined. This also fixes an issue where only schemas for the most recent day of input would be loaded. This could cause a problems for joins as data for the most recent day may not be available across all inputs. As a result these missing inputs did not have schemas loaded.
1 parent e1f6850 commit 1af3093

9 files changed

+192
-42
lines changed

Diff for: contrib/hourglass/src/java/datafu/hourglass/avro/AvroMultipleInputsKeyInputFormat.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,18 @@
3636
* container files store only records (not key/value pairs), the value from
3737
* this InputFormat is a NullWritable.</p>
3838
*/
39-
public class AvroMultipleInputsKeyInputFormat<T> extends FileInputFormat<AvroKey<T>, NullWritable> {
39+
public class AvroMultipleInputsKeyInputFormat<T> extends FileInputFormat<AvroKey<T>, NullWritable>
40+
{
4041
/** {@inheritDoc} */
4142
@Override
4243
public RecordReader<AvroKey<T>, NullWritable> createRecordReader(
4344
InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException
4445
{
45-
Schema readerSchema = AvroMultipleInputsUtil.getInputKeySchemaForSplit(context.getConfiguration(), split);
46+
Schema readerSchema = AvroMultipleInputsUtil.getInputKeySchemaForSplit(context.getConfiguration(), split);
47+
if (readerSchema == null)
48+
{
49+
throw new RuntimeException("Could not determine input schema");
50+
}
4651
return new AvroKeyRecordReader<T>(readerSchema);
4752
}
4853
}

Diff for: contrib/hourglass/src/java/datafu/hourglass/avro/AvroMultipleInputsUtil.java

+13-3
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.hadoop.mapreduce.InputSplit;
2222
import org.apache.hadoop.mapreduce.Job;
2323
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
24+
import org.apache.log4j.Logger;
2425
import org.json.JSONException;
2526
import org.json.JSONObject;
2627

@@ -34,6 +35,8 @@
3435
*/
3536
public class AvroMultipleInputsUtil
3637
{
38+
private static final Logger _log = Logger.getLogger(AvroMultipleInputsUtil.class);
39+
3740
private static final String CONF_INPUT_KEY_SCHEMA = "avro.schema.multiple.inputs.keys";
3841

3942
/**
@@ -46,6 +49,7 @@ public class AvroMultipleInputsUtil
4649
public static Schema getInputKeySchemaForSplit(Configuration conf, InputSplit split)
4750
{
4851
String path = ((FileSplit)split).getPath().toString();
52+
_log.info("Determining schema for input path " + path);
4953
JSONObject schemas;
5054
try
5155
{
@@ -55,16 +59,18 @@ public static Schema getInputKeySchemaForSplit(Configuration conf, InputSplit sp
5559
{
5660
throw new RuntimeException(e1);
5761
}
58-
String schemaString = null;
62+
Schema schema = null;
5963
if (schemas != null)
6064
{
6165
for (String key : JSONObject.getNames(schemas))
6266
{
67+
_log.info("Checking against " + key);
6368
if (path.startsWith(key))
6469
{
6570
try
6671
{
67-
schemaString = schemas.getString(key);
72+
schema = new Schema.Parser().parse(schemas.getString(key));
73+
_log.info("Input schema found: " + schema.toString(true));
6874
break;
6975
}
7076
catch (JSONException e)
@@ -74,7 +80,11 @@ public static Schema getInputKeySchemaForSplit(Configuration conf, InputSplit sp
7480
}
7581
}
7682
}
77-
return schemaString != null ? new Schema.Parser().parse(schemaString) : null;
83+
if (schema == null)
84+
{
85+
_log.info("Could not determine input schema");
86+
}
87+
return schema;
7888
}
7989

8090
/**

Diff for: contrib/hourglass/src/java/datafu/hourglass/fs/PathUtils.java

+22-3
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public class PathUtils
5656
public static final SimpleDateFormat datedPathFormat = new SimpleDateFormat("yyyyMMdd");
5757
public static final SimpleDateFormat nestedDatedPathFormat = new SimpleDateFormat("yyyy/MM/dd");
5858
private static final Pattern timestampPathPattern = Pattern.compile(".+/(\\d{8})");
59-
private static final Pattern dailyPathPattern = Pattern.compile(".+/(\\d{4}/\\d{2}/\\d{2})");
59+
private static final Pattern dailyPathPattern = Pattern.compile("(.+)/(\\d{4}/\\d{2}/\\d{2})");
6060

6161
/**
6262
* Filters out paths starting with "." and "_".
@@ -144,7 +144,7 @@ public static List<DatePath> findNestedDatedPaths(FileSystem fs, Path input) thr
144144
Matcher matcher = dailyPathPattern.matcher(pathStatus.getPath().toString());
145145
if (matcher.matches())
146146
{
147-
String datePath = matcher.group(1);
147+
String datePath = matcher.group(2);
148148
Date date;
149149
try
150150
{
@@ -299,11 +299,30 @@ public static Date getDateForNestedDatedPath(Path path)
299299

300300
try
301301
{
302-
return PathUtils.nestedDatedPathFormat.parse(matcher.group(1));
302+
return PathUtils.nestedDatedPathFormat.parse(matcher.group(2));
303303
}
304304
catch (ParseException e)
305305
{
306306
throw new RuntimeException("Unexpected input filename: " + path);
307307
}
308308
}
309+
310+
/**
311+
* Gets the root path for a path in the "yyyy/MM/dd" format. This is part of the path preceding the
312+
* "yyyy/MM/dd" portion.
313+
*
314+
* @param path
315+
* @return
316+
*/
317+
public static Path getNestedPathRoot(Path path)
318+
{
319+
Matcher matcher = dailyPathPattern.matcher(path.toString());
320+
321+
if (!matcher.matches())
322+
{
323+
throw new RuntimeException("Unexpected input filename: " + path);
324+
}
325+
326+
return new Path(matcher.group(1));
327+
}
309328
}

Diff for: contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionCollapsingExecutionPlanner.java

+20-16
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,13 @@
2424
import java.util.HashMap;
2525
import java.util.List;
2626
import java.util.Map;
27+
import java.util.Map.Entry;
2728
import java.util.Properties;
2829
import java.util.SortedMap;
2930

3031
import org.apache.avro.Schema;
3132
import org.apache.hadoop.fs.FileSystem;
33+
import org.apache.hadoop.fs.Path;
3234
import org.apache.log4j.Logger;
3335

3436

@@ -80,7 +82,7 @@ public class PartitionCollapsingExecutionPlanner extends ExecutionPlanner
8082
private List<DatePath> _inputsToProcess = new ArrayList<DatePath>();
8183
private List<DatePath> _newInputsToProcess = new ArrayList<DatePath>();
8284
private List<DatePath> _oldInputsToProcess = new ArrayList<DatePath>();
83-
private Map<Date,List<DatePath>> _inputsToProcessByDate = new HashMap<Date,List<DatePath>>();
85+
private Map<String,String> _latestInputByPath = new HashMap<String,String>();
8486
private DatePath _previousOutputToProcess;
8587
private List<Schema> _inputSchemas = new ArrayList<Schema>();
8688
private Map<String,Schema> _inputSchemasByPath = new HashMap<String,Schema>();
@@ -282,17 +284,17 @@ private void determineNumReducers() throws IOException
282284
*/
283285
private void determineInputSchemas() throws IOException
284286
{
285-
List<Date> dates = new ArrayList<Date>(_inputsToProcessByDate.keySet());
286-
if (dates.size() > 0)
287+
if (_latestInputByPath.size() > 0)
287288
{
288-
Collections.sort(dates);
289-
Date lastDate = dates.get(dates.size()-1);
290-
List<DatePath> lastInputs = _inputsToProcessByDate.get(lastDate);
291-
for (DatePath input : lastInputs)
289+
_log.info("Determining input schemas");
290+
for (Entry<String,String> entry : _latestInputByPath.entrySet())
292291
{
293-
Schema schema = PathUtils.getSchemaFromPath(getFileSystem(),input.getPath());
292+
String root = entry.getKey();
293+
String input = entry.getValue();
294+
_log.info("Loading schema for " + input);
295+
Schema schema = PathUtils.getSchemaFromPath(getFileSystem(),new Path(input));
294296
_inputSchemas.add(schema);
295-
_inputSchemasByPath.put(input.getPath().toString(), schema);
297+
_inputSchemasByPath.put(root, schema);
296298
}
297299
}
298300
}
@@ -331,7 +333,7 @@ private void determineInputsToProcess() throws IOException
331333
Calendar cal = Calendar.getInstance(PathUtils.timeZone);
332334

333335
_inputsToProcess.clear();
334-
_inputsToProcessByDate.clear();
336+
_latestInputByPath.clear();
335337
_previousOutputToProcess = null;
336338

337339
DateRange outputDateRange = null;
@@ -359,10 +361,11 @@ private void determineInputsToProcess() throws IOException
359361
_log.info(String.format("Input: %s",input.getPath()));
360362
_inputsToProcess.add(input);
361363
_oldInputsToProcess.add(input);
364+
365+
Path root = PathUtils.getNestedPathRoot(input.getPath());
366+
_latestInputByPath.put(root.toString(), input.getPath().toString());
362367
}
363-
364-
_inputsToProcessByDate.put(currentDate, inputs);
365-
368+
366369
cal.setTime(currentDate);
367370
cal.add(Calendar.DAY_OF_MONTH, 1);
368371
currentDate = cal.getTime();
@@ -414,10 +417,11 @@ private void determineInputsToProcess() throws IOException
414417
_log.info(String.format("Input: %s",input.getPath()));
415418
_inputsToProcess.add(input);
416419
_newInputsToProcess.add(input);
420+
421+
Path root = PathUtils.getNestedPathRoot(input.getPath());
422+
_latestInputByPath.put(root.toString(), input.getPath().toString());
417423
}
418-
419-
_inputsToProcessByDate.put(currentDate, inputs);
420-
424+
421425
newDataCount++;
422426
}
423427
}

Diff for: contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionPreservingExecutionPlanner.java

+16-12
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@
2828
import java.util.Set;
2929
import java.util.SortedMap;
3030
import java.util.TreeSet;
31+
import java.util.Map.Entry;
3132

3233
import org.apache.avro.Schema;
3334
import org.apache.hadoop.fs.FileSystem;
35+
import org.apache.hadoop.fs.Path;
3436
import org.apache.log4j.Logger;
3537

3638
import datafu.hourglass.fs.DatePath;
@@ -64,7 +66,7 @@ public class PartitionPreservingExecutionPlanner extends ExecutionPlanner
6466
private final Logger _log = Logger.getLogger(PartitionPreservingExecutionPlanner.class);
6567

6668
private SortedMap<Date,DatePath> _outputPathsByDate;
67-
private Map<Date,List<DatePath>> _inputsToProcessByDate = new HashMap<Date,List<DatePath>>();
69+
private Map<String,String> _latestInputByPath = new HashMap<String,String>();
6870
private List<DatePath> _inputsToProcess = new ArrayList<DatePath>();
6971
private List<Schema> _inputSchemas = new ArrayList<Schema>();
7072
private Map<String,Schema> _inputSchemasByPath = new HashMap<String,Schema>();
@@ -208,17 +210,17 @@ private void determineNumReducers() throws IOException
208210
*/
209211
private void determineInputSchemas() throws IOException
210212
{
211-
List<Date> dates = new ArrayList<Date>(_inputsToProcessByDate.keySet());
212-
if (dates.size() > 0)
213+
if (_latestInputByPath.size() > 0)
213214
{
214-
Collections.sort(dates);
215-
Date lastDate = dates.get(dates.size()-1);
216-
List<DatePath> lastInputs = _inputsToProcessByDate.get(lastDate);
217-
for (DatePath input : lastInputs)
215+
_log.info("Determining input schemas");
216+
for (Entry<String,String> entry : _latestInputByPath.entrySet())
218217
{
219-
Schema schema = PathUtils.getSchemaFromPath(getFileSystem(),input.getPath());
218+
String root = entry.getKey();
219+
String input = entry.getValue();
220+
_log.info("Loading schema for " + input);
221+
Schema schema = PathUtils.getSchemaFromPath(getFileSystem(),new Path(input));
220222
_inputSchemas.add(schema);
221-
_inputSchemasByPath.put(input.getPath().toString(), schema);
223+
_inputSchemasByPath.put(root, schema);
222224
}
223225
}
224226
}
@@ -231,6 +233,7 @@ private void determineInputSchemas() throws IOException
231233
private void determineInputsToProcess()
232234
{
233235
_log.info("Determining inputs to process");
236+
_latestInputByPath.clear();
234237
int newDataCount = 0;
235238
Calendar cal = Calendar.getInstance(PathUtils.timeZone);
236239
for (Date currentDate=getDateRange().getBeginDate(); currentDate.compareTo(getDateRange().getEndDate()) <= 0; )
@@ -251,10 +254,11 @@ private void determineInputsToProcess()
251254
{
252255
_log.info(String.format("Input: %s",input.getPath()));
253256
_inputsToProcess.add(input);
257+
258+
Path root = PathUtils.getNestedPathRoot(input.getPath());
259+
_latestInputByPath.put(root.toString(), input.getPath().toString());
254260
}
255-
256-
_inputsToProcessByDate.put(currentDate, inputs);
257-
261+
258262
newDataCount++;
259263
}
260264
else

Diff for: contrib/hourglass/test/java/datafu/hourglass/test/PartitionCollapsingJoinTest.java

+51-1
Original file line numberDiff line numberDiff line change
@@ -173,16 +173,66 @@ public void joinTest() throws IOException, InterruptedException, ClassNotFoundEx
173173
HashMap<Long,ImpressionClick> counts;
174174

175175
counts = loadOutputCounts("20130317");
176-
checkSize(counts,7);
176+
checkSize(counts,10);
177177
checkIdCount(counts,1,1,1);
178178
checkIdCount(counts,2,1,2);
179+
checkIdCount(counts,3,2,0);
179180
checkIdCount(counts,4,1,1);
180181
checkIdCount(counts,5,1,1);
181182
checkIdCount(counts,6,2,1);
183+
checkIdCount(counts,7,2,0);
182184
checkIdCount(counts,8,1,3);
185+
checkIdCount(counts,9,1,0);
183186
checkIdCount(counts,10,1,2);
184187
}
185188

189+
@Test
190+
public void joinMissingClicksTest() throws IOException, InterruptedException, ClassNotFoundException
191+
{
192+
startImpressions(2013, 3, 15);
193+
startClicks(2013, 3, 15);
194+
impression(1); click(1);
195+
impression(2); click(2); click(2);
196+
impression(3); impression(3);
197+
impression(4); click(4);
198+
stopImpressions();
199+
stopClicks();
200+
201+
startImpressions(2013, 3, 16);
202+
startClicks(2013, 3, 16);
203+
impression(5); click(5);
204+
impression(6); impression(6); click(6);
205+
impression(7);
206+
impression(7);
207+
stopImpressions();
208+
stopClicks();
209+
210+
startImpressions(2013, 3, 17);
211+
impression(8);
212+
impression(9);
213+
impression(10);
214+
stopImpressions();
215+
216+
runJob();
217+
218+
checkOutputFolderCount(1);
219+
220+
HashMap<Long,ImpressionClick> counts;
221+
222+
counts = loadOutputCounts("20130317");
223+
checkSize(counts,10);
224+
checkIdCount(counts,1,1,1);
225+
checkIdCount(counts,2,1,2);
226+
checkIdCount(counts,3,2,0);
227+
checkIdCount(counts,4,1,1);
228+
checkIdCount(counts,5,1,1);
229+
checkIdCount(counts,6,2,1);
230+
checkIdCount(counts,7,2,0);
231+
checkIdCount(counts,8,1,0);
232+
checkIdCount(counts,9,1,0);
233+
checkIdCount(counts,10,1,0);
234+
}
235+
186236
private AbstractPartitionCollapsingIncrementalJob runJob() throws IOException, InterruptedException, ClassNotFoundException
187237
{
188238
_props = newTestProperties();

0 commit comments

Comments
 (0)