Skip to content

Commit b4dd875

Browse files
committed
move relation execution into directive
1 parent 400892a commit b4dd875

File tree

4 files changed

+33
-28
lines changed

4 files changed

+33
-28
lines changed

wrangler-api/src/main/java/io/cdap/wrangler/api/Directive.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
package io.cdap.wrangler.api;
1818

19+
import io.cdap.cdap.etl.api.relational.LinearRelationalTransform;
20+
import io.cdap.cdap.etl.api.relational.Relation;
21+
import io.cdap.cdap.etl.api.relational.RelationalTranformContext;
1922
import io.cdap.wrangler.api.parser.UsageDefinition;
2023

2124
import java.util.List;
@@ -51,7 +54,8 @@
5154
* }
5255
* </code>
5356
*/
54-
public interface Directive extends Executor<List<Row>, List<Row>>, EntityMetrics {
57+
public interface Directive extends Executor<List<Row>, List<Row>>, EntityMetrics,
58+
LinearRelationalTransform {
5559
/**
5660
* This defines a interface variable that is static and final for specify
5761
* the {@code type} of the plugin this interface would provide.
@@ -126,4 +130,11 @@ default List<EntityCountMetric> getCountMetrics() {
126130
// no op
127131
return null;
128132
}
133+
134+
@Override
135+
default Relation transform(RelationalTranformContext relationalTranformContext,
136+
Relation relation) {
137+
// no-op
138+
return relation;
139+
}
129140
}

wrangler-api/src/main/java/io/cdap/wrangler/api/RelationalDirective.java

+11-7
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,19 @@
1616

1717
package io.cdap.wrangler.api;
1818

19+
import io.cdap.cdap.etl.api.relational.LinearRelationalTransform;
20+
import io.cdap.cdap.etl.api.relational.Relation;
21+
import io.cdap.cdap.etl.api.relational.RelationalTranformContext;
22+
1923
/**
2024
* Directive interface which supports Relational transformations
2125
*/
22-
public interface RelationalDirective extends Directive {
23-
24-
/**
25-
* returns sql expression
26-
* @return expression
27-
*/
28-
String getSQL();
26+
public interface RelationalDirective extends Directive, LinearRelationalTransform {
2927

28+
@Override
29+
default Relation transform(RelationalTranformContext relationalTranformContext,
30+
Relation relation) {
31+
// no-op
32+
return relation;
33+
}
3034
}

wrangler-core/src/main/java/io/cdap/directives/column/Drop.java

+8-7
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import io.cdap.cdap.api.annotation.Description;
2020
import io.cdap.cdap.api.annotation.Name;
2121
import io.cdap.cdap.api.annotation.Plugin;
22+
import io.cdap.cdap.etl.api.relational.Relation;
23+
import io.cdap.cdap.etl.api.relational.RelationalTranformContext;
2224
import io.cdap.wrangler.api.Arguments;
2325
import io.cdap.wrangler.api.Directive;
2426
import io.cdap.wrangler.api.DirectiveExecutionException;
@@ -44,7 +46,7 @@
4446
@Name(Drop.NAME)
4547
@Categories(categories = { "column"})
4648
@Description("Drop one or more columns.")
47-
public class Drop implements RelationalDirective, Lineage {
49+
public class Drop implements Directive, Lineage {
4850
public static final String NAME = "drop";
4951

5052
// Columns to be dropped.
@@ -91,12 +93,11 @@ public Mutation lineage() {
9193
}
9294

9395
@Override
94-
public String getSQL() {
95-
String sql = "DROP COLUMN ";
96-
for (String col : columns) {
97-
sql += col + ",";
96+
public Relation transform(RelationalTranformContext relationalTranformContext,
97+
Relation relation) {
98+
for (String col: columns) {
99+
relation = relation.dropColumn(col);
98100
}
99-
sql = sql.substring(0, sql.length() - 1);
100-
return sql;
101+
return relation;
101102
}
102103
}

wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java

+2-13
Original file line numberDiff line numberDiff line change
@@ -589,8 +589,6 @@ && checkPreconditionNotEmpty(true))) {
589589
Expression filterExpression = expressionFactory.get().compile(config.getPreconditionSQL());
590590
Relation filteredRelation = relation.filter(filterExpression);
591591

592-
ExpressionFactory<String> expFactory = expressionFactory.get();
593-
594592
String recipe = config.getDirectives();
595593

596594
registry = SystemDirectiveRegistry.INSTANCE;
@@ -611,17 +609,8 @@ && checkPreconditionNotEmpty(true))) {
611609
}
612610

613611
for (Directive directive : directives) {
614-
// Expression exp = expFactory.compile(sql);
615-
if (!(directive instanceof RelationalDirective)) {
616-
throw new RuntimeException("Directive is not relational Directive");
617-
}
618-
// currently supporting only drop column
619-
// SQL will be returned as "DROP COLUMN col1, col2"
620-
String sql = ((RelationalDirective) directive).getSQL();
621-
List<String> cols = getColumnsOfDropSQL(sql);
622-
for (String col : cols) {
623-
filteredRelation = filteredRelation.dropColumn(col);
624-
}
612+
filteredRelation = directive
613+
.transform(relationalTranformContext, filteredRelation);
625614
}
626615
return filteredRelation;
627616
}

0 commit comments

Comments
 (0)