Skip to content

Commit f0ad8d2

Browse files
Merge pull request LinkedInAttic#37 from matthayes/master
Add a COALESCE UDF
2 parents 98e7ae2 + c622215 commit f0ad8d2

File tree

6 files changed

+261
-0
lines changed

6 files changed

+261
-0
lines changed

Diff for: src/java/datafu/pig/util/COALESCE.java

+99
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package datafu.pig.util;
2+
3+
import java.io.IOException;
4+
5+
import org.apache.pig.EvalFunc;
6+
import org.apache.pig.data.DataType;
7+
import org.apache.pig.data.Tuple;
8+
import org.apache.pig.impl.logicalLayer.schema.Schema;
9+
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
10+
11+
/**
12+
* Returns the first non-null value from a tuple.
13+
* <p>
14+
* Similar to {@link <a href="http://msdn.microsoft.com/en-us/library/ms190349.aspx" target="_blank">COALESCE</a>} in T-SQL.
15+
* </p>
16+
*
17+
* <p>
18+
* Example:
19+
* <pre>
20+
* {@code
21+
*
22+
* define COALESCE datafu.pig.util.COALESCE();
23+
24+
* -- input: 1,2,3,NULL,4,NULL,5
25+
* input = LOAD 'input' AS (val:int);
26+
*
27+
* -- produces: 1,2,3,99,4,99,5
28+
* coalesced = FOREACH input GENERATE COALESCE(val,99);
29+
*
30+
* }</pre></p>
31+
*
32+
* @author "Matthew Hayes <[email protected]>"
33+
*
34+
*/
35+
public class COALESCE extends EvalFunc<Object>
36+
{
37+
@Override
38+
public Object exec(Tuple input) throws IOException
39+
{
40+
41+
if (input == null || input.size() == 0)
42+
{
43+
return null;
44+
}
45+
46+
for (Object o : input)
47+
{
48+
if (o != null)
49+
{
50+
return o;
51+
}
52+
}
53+
54+
return null;
55+
}
56+
57+
@Override
58+
public Schema outputSchema(Schema input)
59+
{
60+
if (input.getFields().size() == 0)
61+
{
62+
throw new RuntimeException("Expected at least one parameter");
63+
}
64+
65+
Byte firstType = null;
66+
int pos = 0;
67+
for (FieldSchema field : input.getFields())
68+
{
69+
if (DataType.isSchemaType(field.type))
70+
{
71+
throw new RuntimeException(String.format("Not supported on schema types. Found %s in position %d.",DataType.findTypeName(field.type),pos));
72+
}
73+
74+
if (DataType.isComplex(field.type))
75+
{
76+
throw new RuntimeException(String.format("Not supported on complex types. Found %s in position %d.",DataType.findTypeName(field.type),pos));
77+
}
78+
79+
if (!DataType.isUsableType(field.type))
80+
{
81+
throw new RuntimeException(String.format("Not a usable type. Found %s in position %d.",DataType.findTypeName(field.type),pos));
82+
}
83+
84+
if (firstType == null)
85+
{
86+
firstType = field.type;
87+
}
88+
else if (!firstType.equals(field.type))
89+
{
90+
throw new RuntimeException(String.format("Expected all types to be equal, but found %s in position %d. First element has type %s.",
91+
DataType.findTypeName(field.type),pos,DataType.findTypeName((byte)firstType)));
92+
}
93+
94+
pos++;
95+
}
96+
97+
return new Schema(new Schema.FieldSchema("item",firstType));
98+
}
99+
}

Diff for: test/pig/datafu/test/pig/util/CoalesceTests.java

+104
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package datafu.test.pig.util;
2+
3+
import java.util.List;
4+
5+
import junit.framework.Assert;
6+
7+
import org.apache.pig.data.Tuple;
8+
import org.apache.pig.impl.logicalLayer.FrontendException;
9+
import org.apache.pig.pigunit.PigTest;
10+
import org.testng.annotations.Test;
11+
12+
import datafu.test.pig.PigTests;
13+
14+
public class CoalesceTests extends PigTests
15+
{
16+
@Test
17+
public void coalesceIntTest() throws Exception
18+
{
19+
PigTest test = createPigTest("test/pig/datafu/test/pig/util/coalesceTest.pig");
20+
21+
this.writeLinesToFile("input", "1,1,2,3",
22+
"2,,2,3",
23+
"3,,,3",
24+
"4,,,",
25+
"5,1,,3",
26+
"6,1,,");
27+
28+
test.runScript();
29+
30+
List<Tuple> lines = this.getLinesForAlias(test, "data3");
31+
32+
Assert.assertEquals(6, lines.size());
33+
for (Tuple t : lines)
34+
{
35+
switch((Integer)t.get(0))
36+
{
37+
case 1:
38+
Assert.assertEquals(1, t.get(1)); break;
39+
case 2:
40+
Assert.assertEquals(2, t.get(1)); break;
41+
case 3:
42+
Assert.assertEquals(3, t.get(1)); break;
43+
case 4:
44+
Assert.assertEquals(null, t.get(1)); break;
45+
case 5:
46+
Assert.assertEquals(1, t.get(1)); break;
47+
case 6:
48+
Assert.assertEquals(1, t.get(1)); break;
49+
default:
50+
Assert.fail("Did not expect: " + t.get(1));
51+
}
52+
}
53+
}
54+
@Test
55+
public void coalesceLongTest() throws Exception
56+
{
57+
PigTest test = createPigTest("test/pig/datafu/test/pig/util/coalesceLongTest.pig");
58+
59+
this.writeLinesToFile("input", "1,5",
60+
"2,");
61+
62+
test.runScript();
63+
64+
List<Tuple> lines = this.getLinesForAlias(test, "data4");
65+
66+
Assert.assertEquals(2, lines.size());
67+
for (Tuple t : lines)
68+
{
69+
switch((Integer)t.get(0))
70+
{
71+
case 1:
72+
Assert.assertEquals(500L, t.get(1)); break;
73+
case 2:
74+
Assert.assertEquals(10000L, t.get(1)); break;
75+
default:
76+
Assert.fail("Did not expect: " + t.get(1));
77+
}
78+
}
79+
}
80+
81+
@Test(expectedExceptions=FrontendException.class)
82+
public void coalesceDiffTypesTest() throws Exception
83+
{
84+
PigTest test = createPigTest("test/pig/datafu/test/pig/util/coalesceDiffTypesTest.pig");
85+
86+
this.writeLinesToFile("input", "1,1,2.0");
87+
88+
test.runScript();
89+
90+
this.getLinesForAlias(test, "data3");
91+
}
92+
93+
@Test(expectedExceptions=FrontendException.class)
94+
public void coalesceBagTypeTest() throws Exception
95+
{
96+
PigTest test = createPigTest("test/pig/datafu/test/pig/util/coalesceBagTypeTest.pig");
97+
98+
this.writeLinesToFile("input", "1,1,{(2)}");
99+
100+
test.runScript();
101+
102+
this.getLinesForAlias(test, "data3");
103+
}
104+
}
+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
register $JAR_PATH
2+
3+
define COALESCE datafu.pig.util.COALESCE();
4+
5+
data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:INT,val2: bag {tuple(aVal:int)});
6+
7+
data2 = FOREACH data GENERATE testcase, COALESCE(val1,val2) as result;
8+
9+
describe data2;
10+
11+
data3 = FOREACH data2 GENERATE testcase, result;
12+
13+
STORE data3 INTO 'output';
14+
+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
register $JAR_PATH
2+
3+
define COALESCE datafu.pig.util.COALESCE();
4+
5+
data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:INT,val2:DOUBLE);
6+
7+
data2 = FOREACH data GENERATE testcase, COALESCE(val1,val2) as result;
8+
9+
describe data2;
10+
11+
data3 = FOREACH data2 GENERATE testcase, result;
12+
13+
STORE data3 INTO 'output';
14+

Diff for: test/pig/datafu/test/pig/util/coalesceLongTest.pig

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
register $JAR_PATH
2+
3+
define COALESCE datafu.pig.util.COALESCE();
4+
5+
data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:LONG);
6+
7+
data2 = FOREACH data GENERATE testcase, COALESCE(val1,100L) as result;
8+
9+
describe data2;
10+
11+
data3 = FOREACH data2 GENERATE testcase, result;
12+
13+
data4 = FOREACH data3 GENERATE testcase, result*100 as result;
14+
15+
STORE data4 INTO 'output';
16+

Diff for: test/pig/datafu/test/pig/util/coalesceTest.pig

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
register $JAR_PATH
2+
3+
define COALESCE datafu.pig.util.COALESCE();
4+
5+
data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:INT,val2:INT,val3:INT);
6+
7+
data2 = FOREACH data GENERATE testcase, COALESCE(val1,val2,val3) as result;
8+
9+
describe data2;
10+
11+
data3 = FOREACH data2 GENERATE testcase, result;
12+
13+
STORE data3 INTO 'output';
14+

0 commit comments

Comments
 (0)