Skip to content

Commit 44d05b2

Browse files
Integration tests for deduplicate and groupby in bq pushdown
1 parent 60119d0 commit 44d05b2

File tree

3 files changed

+630
-3
lines changed

3 files changed

+630
-3
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
/*
2+
* Copyright © 2016 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.cdap.app.etl.batch;
18+
19+
import com.google.common.collect.ImmutableMap;
20+
import com.google.common.collect.ImmutableSet;
21+
import com.google.common.reflect.TypeToken;
22+
import io.cdap.cdap.api.Resources;
23+
import io.cdap.cdap.api.common.Bytes;
24+
import io.cdap.cdap.api.data.schema.Schema;
25+
import io.cdap.cdap.api.dataset.table.Put;
26+
import io.cdap.cdap.api.dataset.table.Table;
27+
import io.cdap.cdap.app.etl.ETLTestBase;
28+
import io.cdap.cdap.app.etl.dataset.DatasetAccessApp;
29+
import io.cdap.cdap.app.etl.dataset.SnapshotFilesetService;
30+
import io.cdap.cdap.datapipeline.SmartWorkflow;
31+
import io.cdap.cdap.etl.api.Engine;
32+
import io.cdap.cdap.etl.api.batch.BatchAggregator;
33+
import io.cdap.cdap.etl.api.batch.BatchSink;
34+
import io.cdap.cdap.etl.api.batch.BatchSource;
35+
import io.cdap.cdap.etl.proto.v2.ETLBatchConfig;
36+
import io.cdap.cdap.etl.proto.v2.ETLPlugin;
37+
import io.cdap.cdap.etl.proto.v2.ETLStage;
38+
import io.cdap.cdap.proto.ProgramRunStatus;
39+
import io.cdap.cdap.proto.artifact.AppRequest;
40+
import io.cdap.cdap.proto.id.ApplicationId;
41+
import io.cdap.cdap.test.ApplicationManager;
42+
import io.cdap.cdap.test.DataSetManager;
43+
import io.cdap.cdap.test.ServiceManager;
44+
import io.cdap.cdap.test.WorkflowManager;
45+
import io.cdap.cdap.test.suite.category.RequiresSpark;
46+
import io.cdap.common.http.HttpMethod;
47+
import io.cdap.common.http.HttpResponse;
48+
import io.cdap.common.http.ObjectResponse;
49+
import io.cdap.plugin.common.Properties;
50+
import org.apache.avro.file.DataFileStream;
51+
import org.apache.avro.generic.GenericDatumReader;
52+
import org.apache.avro.generic.GenericRecord;
53+
import org.apache.avro.generic.GenericRecordBuilder;
54+
import org.apache.avro.io.DatumReader;
55+
import org.junit.Assert;
56+
import org.junit.Test;
57+
import org.junit.experimental.categories.Category;
58+
59+
import java.io.ByteArrayInputStream;
60+
import java.io.IOException;
61+
import java.net.HttpURLConnection;
62+
import java.net.URL;
63+
import java.util.HashSet;
64+
import java.util.Map;
65+
import java.util.Set;
66+
import java.util.concurrent.TimeUnit;
67+
68+
/**
69+
* Tests DedupAggregator
70+
*/
71+
public class DedupAggregatorTest extends ETLTestBase {
72+
public static final String SMARTWORKFLOW_NAME = SmartWorkflow.NAME;
73+
public static final String USER_SOURCE = "userSource";
74+
public static final String USER_SINK = "userSink";
75+
76+
public static final Schema USER_SCHEMA = Schema.recordOf(
77+
"user",
78+
Schema.Field.of("Lastname", Schema.of(Schema.Type.STRING)),
79+
Schema.Field.of("Firstname", Schema.of(Schema.Type.STRING)),
80+
Schema.Field.of("profession", Schema.of(Schema.Type.STRING)),
81+
Schema.Field.of("age", Schema.of(Schema.Type.INT)));
82+
83+
private static final Map<String, String> CONFIG_MAP = new ImmutableMap.Builder<String, String>()
84+
.put("uniqueFields", "profession")
85+
.put("filterOperation", "age:Min")
86+
.build();
87+
88+
@Category({
89+
RequiresSpark.class
90+
})
91+
@Test
92+
public void testDeduplicateSpark() throws Exception {
93+
testDeduplicate(Engine.SPARK);
94+
}
95+
96+
private void testDeduplicate(Engine spark) throws Exception {
97+
ETLStage userSourceStage =
98+
new ETLStage("users", new ETLPlugin("Table",
99+
BatchSource.PLUGIN_TYPE,
100+
ImmutableMap.of(
101+
Properties.BatchReadableWritable.NAME, USER_SOURCE,
102+
Properties.Table.PROPERTY_SCHEMA, USER_SCHEMA.toString()), null));
103+
104+
ETLStage userSinkStage = new ETLStage(USER_SINK, new ETLPlugin("SnapshotAvro", BatchSink.PLUGIN_TYPE,
105+
ImmutableMap.<String, String>builder()
106+
.put(Properties.BatchReadableWritable.NAME, USER_SINK)
107+
.put("schema", USER_SCHEMA.toString())
108+
.build(), null));
109+
110+
ETLStage userGroupStage = new ETLStage("KeyAggregate", new ETLPlugin("Deduplicate",
111+
BatchAggregator.PLUGIN_TYPE,
112+
CONFIG_MAP, null));
113+
114+
115+
ETLBatchConfig config = ETLBatchConfig.builder("* * * * *")
116+
.addStage(userSourceStage)
117+
.addStage(userSinkStage)
118+
.addStage(userGroupStage)
119+
.addConnection(userSourceStage.getName(), userGroupStage.getName())
120+
.addConnection(userGroupStage.getName(), userSinkStage.getName())
121+
.setDriverResources(new Resources(2048))
122+
.setResources(new Resources(2048))
123+
.build();
124+
125+
126+
ingestInputData(USER_SOURCE);
127+
128+
AppRequest<ETLBatchConfig> request = getBatchAppRequestV2(config);
129+
ApplicationId appId = TEST_NAMESPACE.app("deduplicate-test");
130+
ApplicationManager appManager = deployApplication(appId, request);
131+
132+
WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME);
133+
startAndWaitForRun(workflowManager, ProgramRunStatus.COMPLETED, 10, TimeUnit.MINUTES);
134+
135+
// Deploy an application with a service to get partitionedFileset data for verification
136+
ApplicationManager applicationManager = deployApplication(DatasetAccessApp.class);
137+
ServiceManager serviceManager = applicationManager.getServiceManager
138+
(SnapshotFilesetService.class.getSimpleName());
139+
startAndWaitForRun(serviceManager, ProgramRunStatus.RUNNING);
140+
141+
org.apache.avro.Schema avroOutputSchema = new org.apache.avro.Schema.Parser().parse(USER_SCHEMA.toString());
142+
// output has these records:
143+
// 1: shelton, alex, professor, 45
144+
// 3: schuster, chris, accountant, 23
145+
// 5: gamal , ali , engineer, 28
146+
GenericRecord record1 = new GenericRecordBuilder(avroOutputSchema)
147+
.set("Lastname", "Shelton")
148+
.set("Firstname", "Alex")
149+
.set("profession", "professor")
150+
.set("age", 45)
151+
.build();
152+
153+
GenericRecord record2 = new GenericRecordBuilder(avroOutputSchema)
154+
.set("Lastname", "Schuster")
155+
.set("Firstname", "Chris")
156+
.set("profession", "accountant")
157+
.set("age", 23)
158+
.build();
159+
160+
GenericRecord record3 = new GenericRecordBuilder(avroOutputSchema)
161+
.set("Lastname", "Gamal")
162+
.set("Firstname", "Ali")
163+
.set("profession", "engineer")
164+
.set("age", 28)
165+
.build();
166+
167+
Set<GenericRecord> expected = ImmutableSet.of(record1, record2, record3);
168+
// verfiy output
169+
Assert.assertEquals(expected, readOutput(serviceManager, USER_SINK, USER_SCHEMA));
170+
}
171+
172+
private Set<GenericRecord> readOutput(ServiceManager serviceManager, String sink, Schema schema)
173+
throws IOException {
174+
URL pfsURL = new URL(serviceManager.getServiceURL(PROGRAM_START_STOP_TIMEOUT_SECONDS, TimeUnit.SECONDS),
175+
String.format("read/%s", sink));
176+
HttpResponse response = getRestClient().execute(HttpMethod.GET, pfsURL, getClientConfig().getAccessToken());
177+
178+
Assert.assertEquals(HttpURLConnection.HTTP_OK, response.getResponseCode());
179+
180+
Map<String, byte[]> map = ObjectResponse.<Map<String, byte[]>>fromJsonBody(
181+
response, new TypeToken<Map<String, byte[]>>() { }.getType()).getResponseObject();
182+
183+
return parseOutput(map, schema);
184+
}
185+
186+
private Set<GenericRecord> parseOutput(Map<String, byte[]> contents, Schema schema) throws IOException {
187+
org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(schema.toString());
188+
Set<GenericRecord> records = new HashSet<>();
189+
for (Map.Entry<String, byte[]> entry : contents.entrySet()) {
190+
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(avroSchema);
191+
try (DataFileStream<GenericRecord> fileStream = new DataFileStream<>(
192+
new ByteArrayInputStream(entry.getValue()), datumReader)) {
193+
for (GenericRecord record : fileStream) {
194+
records.add(record);
195+
}
196+
}
197+
}
198+
return records;
199+
}
200+
201+
private void ingestInputData(String inputDatasetName) throws Exception {
202+
// 1: shelton, alex, professor, 45
203+
// 2: seitz, bob, professor, 50
204+
// 3: schuster, chris, accountant, 23
205+
// 4: bolt , henry , engineer, 30
206+
// 5: gamal , ali , engineer, 28
207+
DataSetManager<Table> inputManager = getTableDataset(inputDatasetName);
208+
Table inputTable = inputManager.get();
209+
putValues(inputTable, 1, "Shelton", "Alex", "professor", 45);
210+
putValues(inputTable, 2, "Seitz", "Bob", "professor", 50);
211+
putValues(inputTable, 3, "Schuster", "Chris", "accountant", 23);
212+
putValues(inputTable, 4, "Bolt", "Henry", "engineer", 30);
213+
putValues(inputTable, 5, "Gamal", "Ali", "engineer", 28);
214+
inputManager.flush();
215+
}
216+
217+
private void putValues(Table inputTable, int index, String lastname, String firstname, String profession,
218+
int age) {
219+
Put put = new Put(Bytes.toBytes(index));
220+
put.add("Lastname", lastname);
221+
put.add("Firstname", firstname);
222+
put.add("profession", profession);
223+
put.add("age", age);
224+
inputTable.put(put);
225+
}
226+
}

0 commit comments

Comments
 (0)