Skip to content

Commit b7da1fe

Browse files
Error management for Wrangler Transform plugin
1 parent 0f6e1b4 commit b7da1fe

File tree

3 files changed

+154
-29
lines changed

3 files changed

+154
-29
lines changed

wrangler-transform/src/main/java/io/cdap/wrangler/Precondition.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.cdap.wrangler;
1818

19+
import io.cdap.cdap.api.exception.ErrorType;
1920
import io.cdap.wrangler.api.Row;
2021
import org.apache.commons.jexl3.scripting.JexlScriptEngine;
2122

@@ -72,9 +73,10 @@ public boolean apply(Row row) throws PreconditionException {
7273
scriptContext.setBindings(bindings, ScriptContext.ENGINE_SCOPE);
7374
Object result = script.eval(scriptContext);
7475
if (!(result instanceof Boolean)) {
75-
throw new PreconditionException(
76-
String.format("Precondition '%s' does not result in true or false.", condition)
77-
);
76+
String errorMessage = String.format("Precondition '%s' does not result in true or false.",
77+
condition);
78+
throw WranglerUtil.getProgramFailureExceptionDetailsFromChain(null, errorMessage,
79+
ErrorType.USER, false);
7880
}
7981
return (Boolean) result;
8082
} catch (ScriptException e) {

wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java

Lines changed: 74 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.cdap.cdap.api.annotation.Plugin;
2525
import io.cdap.cdap.api.data.format.StructuredRecord;
2626
import io.cdap.cdap.api.data.schema.Schema;
27+
import io.cdap.cdap.api.exception.ErrorType;
2728
import io.cdap.cdap.api.metrics.Metrics;
2829
import io.cdap.cdap.api.plugin.PluginConfig;
2930
import io.cdap.cdap.api.plugin.PluginProperties;
@@ -53,6 +54,7 @@
5354
import io.cdap.wrangler.api.EntityCountMetric;
5455
import io.cdap.wrangler.api.ErrorRecord;
5556
import io.cdap.wrangler.api.ExecutorContext;
57+
import io.cdap.wrangler.api.RecipeException;
5658
import io.cdap.wrangler.api.RecipeParser;
5759
import io.cdap.wrangler.api.RecipePipeline;
5860
import io.cdap.wrangler.api.RecipeSymbol;
@@ -243,9 +245,13 @@ public void configurePipeline(PipelineConfigurer configurer) {
243245
}
244246
}
245247
} catch (CompileException e) {
246-
collector.addFailure("Compilation error occurred : " + e.getMessage(), null);
248+
collector.addFailure(
249+
String.format("Compilation error occurred, %s: %s ", e.getClass().getName(),
250+
e.getMessage()), null);
247251
} catch (DirectiveParseException e) {
248-
collector.addFailure(e.getMessage(), null);
252+
collector.addFailure(
253+
String.format("Error parsing directive, %s: %s", e.getClass().getName(),
254+
e.getMessage()), null);
249255
}
250256

251257
// Based on the configuration create output schema.
@@ -254,8 +260,9 @@ public void configurePipeline(PipelineConfigurer configurer) {
254260
oSchema = Schema.parseJson(config.schema);
255261
}
256262
} catch (IOException e) {
257-
collector.addFailure("Invalid output schema.", null)
258-
.withConfigProperty(Config.NAME_SCHEMA).withStacktrace(e.getStackTrace());
263+
collector.addFailure(
264+
String.format("Invalid output schema %s: %s", e.getClass().getName(), e.getMessage()),
265+
null).withConfigProperty(Config.NAME_SCHEMA).withStacktrace(e.getStackTrace());
259266
}
260267

261268
// Check if jexl pre-condition is not null or empty and if so compile expression.
@@ -265,7 +272,9 @@ && checkPreconditionNotEmpty(false)) {
265272
try {
266273
new Precondition(config.getPreconditionJEXL());
267274
} catch (PreconditionException e) {
268-
collector.addFailure(e.getMessage(), null).withConfigProperty(Config.NAME_PRECONDITION);
275+
collector.addFailure(String.format("Error compiling precondition expression, %s: %s",
276+
e.getClass().getName(), e.getMessage()), null)
277+
.withConfigProperty(Config.NAME_PRECONDITION);
269278
}
270279
}
271280
}
@@ -276,8 +285,11 @@ && checkPreconditionNotEmpty(false)) {
276285
}
277286

278287
} catch (Exception e) {
279-
LOG.error(e.getMessage());
280-
collector.addFailure("Error occurred : " + e.getMessage(), null).withStacktrace(e.getStackTrace());
288+
LOG.error("Error occurred during configuration of the plugin, {}: {}", e.getClass().getName(),
289+
e.getMessage());
290+
collector.addFailure(
291+
String.format("Error occurred during configuration of the plugin, %s: %s",
292+
e.getClass().getName(), e.getMessage()), null).withStacktrace(e.getStackTrace());
281293
}
282294
}
283295

@@ -319,7 +331,12 @@ public void prepareRun(StageSubmitterContext context) throws Exception {
319331
// Parse the recipe and extract all the instances of directives
320332
// to be processed for extracting lineage.
321333
RecipeParser recipe = getRecipeParser(context);
322-
List<Directive> directives = recipe.parse();
334+
List<Directive> directives;
335+
try {
336+
directives = recipe.parse();
337+
} catch (RecipeException e) {
338+
throw WranglerUtil.getProgramFailureExceptionDetailsFromChain(e, null, ErrorType.SYSTEM, false);
339+
}
323340
emitDirectiveMetrics(directives, context.getMetrics());
324341

325342
LineageOperations lineageOperations = new LineageOperations(input, output, directives);
@@ -345,10 +362,11 @@ public void initialize(TransformContext context) throws Exception {
345362
try {
346363
oSchema = Schema.parseJson(config.schema);
347364
} catch (IOException e) {
348-
throw new IllegalArgumentException(
349-
String.format("Stage:%s - Format of output schema specified is invalid. Please check the format.",
350-
context.getStageName()), e
351-
);
365+
String errorMessage = String.format("Error in stage '%s'. Format of output schema specified "
366+
+ "is invalid. Please check the format. %s: %s", context.getStageName(),
367+
e.getClass().getName(), e.getMessage());
368+
throw WranglerUtil.getProgramFailureExceptionDetailsFromChain(
369+
new IllegalArgumentException(errorMessage, e), errorMessage, ErrorType.USER, false);
352370
}
353371

354372
// Check if jexl pre-condition is not null or empty and if so compile expression.
@@ -358,7 +376,7 @@ && checkPreconditionNotEmpty(false)) {
358376
try {
359377
condition = new Precondition(config.getPreconditionJEXL());
360378
} catch (PreconditionException e) {
361-
throw new IllegalArgumentException(e.getMessage(), e);
379+
throw WranglerUtil.getProgramFailureExceptionDetailsFromChain(e, null, ErrorType.SYSTEM, false);
362380
}
363381
}
364382
}
@@ -367,7 +385,11 @@ && checkPreconditionNotEmpty(false)) {
367385
// Create the pipeline executor with context being set.
368386
pipeline = new RecipePipelineExecutor(recipe, ctx);
369387
} catch (Exception e) {
370-
throw new Exception(String.format("Stage:%s - %s", getContext().getStageName(), e.getMessage()), e);
388+
String errorMessage = String.format(
389+
"Error in stage '%s'. Please check the configuration or input data. %s: %s",
390+
context.getStageName(), e.getClass().getName(), e.getMessage());
391+
throw WranglerUtil.getProgramFailureExceptionDetailsFromChain(e, errorMessage,
392+
ErrorType.SYSTEM, false);
371393
}
372394

373395
String defaultStrategy = context.getArguments().get(ERROR_STRATEGY_DEFAULT);
@@ -437,8 +459,10 @@ && checkPreconditionNotEmpty(false)) {
437459
}
438460
if (WRANGLER_FAIL_PIPELINE_FOR_ERROR.isEnabled(getContext())
439461
&& onErrorStrategy.equalsIgnoreCase(ON_ERROR_FAIL_PIPELINE)) {
440-
throw new Exception(
441-
String.format("Errors in Wrangler Transformation - %s", errorMessages));
462+
String errorReason = String.format("Errors in Wrangler Transformation - %s",
463+
errorMessages);
464+
throw WranglerUtil.getProgramFailureExceptionDetailsFromChain(null, errorReason,
465+
ErrorType.SYSTEM, true);
442466
}
443467
}
444468
} catch (Exception e) {
@@ -457,8 +481,10 @@ && checkPreconditionNotEmpty(false)) {
457481
getContext().getStageName(), e.getMessage()),
458482
"value", String.valueOf(errorCounter)
459483
));
460-
throw new Exception(String.format("Stage:%s - Failing pipeline due to error : %s",
461-
getContext().getStageName(), e.getMessage()), e);
484+
String errorMessage = String.format("Pipeline failed at stage:%s, %s: %s",
485+
getContext().getStageName(), e.getClass().getName(), e.getMessage());
486+
throw WranglerUtil.getProgramFailureExceptionDetailsFromChain(e, errorMessage,
487+
ErrorType.SYSTEM, true);
462488
}
463489
// If it's 'skip-on-error' we continue processing and don't emit any error records.
464490
return;
@@ -553,18 +579,32 @@ private boolean checkPreconditionNotEmpty(Boolean isConditionSQL) {
553579
* @throws DirectiveLoadException
554580
* @throws DirectiveParseException
555581
*/
556-
private RecipeParser getRecipeParser(StageContext context)
557-
throws DirectiveLoadException, DirectiveParseException {
582+
private RecipeParser getRecipeParser(StageContext context) {
558583

559584
registry = new CompositeDirectiveRegistry(SystemDirectiveRegistry.INSTANCE, new UserDirectiveRegistry(context));
560-
registry.reload(context.getNamespace());
585+
try {
586+
registry.reload(context.getNamespace());
587+
} catch (DirectiveLoadException e) {
588+
LOG.error("Failed to reload the directive registry for namespace "
589+
+ "'{}' at stage '{}'. Please verify the namespace and ensure the directives are "
590+
+ "correctly configured. {}: {}", context.getNamespace(), context.getStageName(),
591+
e.getClass().getName(), e.getMessage());
592+
throw WranglerUtil.getProgramFailureExceptionDetailsFromChain(e, null, ErrorType.USER, false);
593+
}
561594

562595
String directives = config.getDirectives();
563596
if (config.getUDDs() != null && !config.getUDDs().trim().isEmpty()) {
564597
directives = String.format("#pragma load-directives %s;%s", config.getUDDs(), config.getDirectives());
565598
}
566599

567-
return new GrammarBasedParser(context.getNamespace(), new MigrateToV2(directives).migrate(), registry);
600+
try {
601+
return new GrammarBasedParser(context.getNamespace(), new MigrateToV2(directives).migrate(), registry);
602+
} catch (DirectiveParseException e) {
603+
LOG.error("Failed to parse directives for namespace '{}' at stage "
604+
+ "'{}'. Please verify the directives and ensure they are correctly formatted. {}, {}",
605+
context.getNamespace(), context.getStageName(), e.getClass().getName(), e.getMessage());
606+
throw WranglerUtil.getProgramFailureExceptionDetailsFromChain(e, null, ErrorType.USER, false);
607+
}
568608
}
569609

570610
@Override
@@ -573,7 +613,8 @@ public Relation transform(RelationalTranformContext relationalTranformContext, R
573613
&& checkPreconditionNotEmpty(true)) {
574614

575615
if (!Feature.WRANGLER_PRECONDITION_SQL.isEnabled(relationalTranformContext)) {
576-
throw new RuntimeException("SQL Precondition feature is not available");
616+
throw WranglerUtil.getProgramFailureExceptionDetailsFromChain(null,
617+
"SQL Precondition feature is not available", ErrorType.SYSTEM, true);
577618
}
578619

579620
Optional<ExpressionFactory<String>> expressionFactory = getExpressionFactory(relationalTranformContext);
@@ -598,11 +639,18 @@ private Optional<ExpressionFactory<String>> getExpressionFactory(RelationalTranf
598639
* @param directives a list of Wrangler directives
599640
* @param metrics CDAP {@link Metrics} object using which metrics can be emitted
600641
*/
601-
private void emitDirectiveMetrics(List<Directive> directives, Metrics metrics) throws DirectiveLoadException {
642+
private void emitDirectiveMetrics(List<Directive> directives, Metrics metrics) {
602643
for (Directive directive : directives) {
603644
// skip emitting metrics if the directive is not system directive
604-
if (registry.get(Contexts.SYSTEM, directive.define().getDirectiveName()) == null) {
605-
continue;
645+
try {
646+
if (registry.get(Contexts.SYSTEM, directive.define().getDirectiveName()) == null) {
647+
continue;
648+
}
649+
} catch (DirectiveLoadException e) {
650+
LOG.error("Error loading system directive '{}'. {}: {}",
651+
directive.define().getDirectiveName(), e.getClass().getName(), e.getMessage());
652+
throw WranglerUtil.getProgramFailureExceptionDetailsFromChain(e, null, ErrorType.SYSTEM,
653+
false);
606654
}
607655
List<EntityCountMetric> countMetrics = new ArrayList<>();
608656

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.wrangler;
18+
19+
import com.google.common.base.Throwables;
20+
import io.cdap.cdap.api.exception.ErrorCategory;
21+
import io.cdap.cdap.api.exception.ErrorType;
22+
import io.cdap.cdap.api.exception.ErrorUtils;
23+
import io.cdap.cdap.api.exception.ProgramFailureException;
24+
import io.cdap.wrangler.api.DirectiveLoadException;
25+
import io.cdap.wrangler.api.DirectiveParseException;
26+
import io.cdap.wrangler.api.RecipeException;
27+
import java.util.List;
28+
29+
/**
30+
* Util file to handle exceptions caught in Wrangler plugin
31+
*/
32+
public class WranglerUtil {
33+
34+
private WranglerUtil() {
35+
throw new IllegalStateException("Utility class");
36+
}
37+
38+
public static ProgramFailureException getProgramFailureExceptionDetailsFromChain(Throwable e,
39+
String errorMessage, ErrorType errorType, boolean dependency) {
40+
if (e != null) {
41+
List<Throwable> causalChain = Throwables.getCausalChain(e);
42+
for (Throwable t : causalChain) {
43+
if (t instanceof ProgramFailureException) {
44+
// Avoid double wrap
45+
return (ProgramFailureException) t;
46+
}
47+
if (t instanceof DirectiveLoadException || t instanceof DirectiveParseException
48+
|| t instanceof RecipeException || t instanceof PreconditionException) {
49+
return getProgramFailureException((Exception) t, errorType, dependency);
50+
}
51+
}
52+
}
53+
// If no predefined exception found in the causal chain, return generic program failure exception
54+
return ErrorUtils.getProgramFailureException(
55+
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
56+
errorType, dependency, e);
57+
}
58+
59+
/**
60+
* Get a ProgramFailureException with the given error information from {@link Exception}.
61+
*
62+
* @param exception The Exception to get the error information from.
63+
* @param errorType The ErrorType to get the error type information.
64+
* @param dependency The dependency to show if it depends on external source or not
65+
* @return A ProgramFailureException with the given error information.
66+
*/
67+
private static ProgramFailureException getProgramFailureException(Exception exception,
68+
ErrorType errorType, boolean dependency) {
69+
String errorMessage = exception.getMessage();
70+
return ErrorUtils.getProgramFailureException(
71+
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
72+
errorType, dependency, exception);
73+
}
74+
75+
}

0 commit comments

Comments
 (0)