Skip to content

Commit 1743579

Browse files
samratmitra-0812dougb
authored andcommitted
String Length Check (#14)
* String Length Check * Update README.md Co-Authored-By: Colin Dean <[email protected]> * Update README.md Co-Authored-By: Colin Dean <[email protected]> * Update README.md Co-Authored-By: Colin Dean <[email protected]> * Removed unused imports and variables * Unit Test Fix:Check that exactly one of the 2 errors are present
1 parent 102cfb9 commit 1743579

File tree

4 files changed

+548
-1
lines changed

4 files changed

+548
-1
lines changed

README.md

+18-1
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,17 @@ Takes 2 - 4 parameters, described below. If the value in the column doesn't fall
240240

241241
**Note:** To specify another column in the table, you must prefix the column name with a **`** (backtick).
242242

243+
#### `stringLengthCheck`
244+
245+
Takes 2 or 3 parameters, described in the table below. If the length of the string in the column doesn't fall within the range specified by (`minValue`, `maxValue`), both inclusive, the check will fail.
246+
At least one of `minValue` or `maxValue` must be specified. The data type of `column` must be String.
247+
248+
| Arg | Type | Description |
249+
|-----|------|-------------|
250+
| `column` | String | Table column to be checked. The DataType of the column must be a String
251+
| `minValue` | Integer | Lower bound of the length of the string, inclusive.
252+
| `maxValue` | Integer | Upper bound of the length of the string, inclusive.
253+
243254
#### `rowCount`
244255

245256
The minimum number of rows a table must have to pass the validator.
@@ -324,6 +335,12 @@ tables:
324335
# nullCheck - checks if the column is null, counts number of rows with null for this column.
325336
- type: nullCheck
326337
column: occupation
338+
339+
# stringLengthCheck - checks if the length of the string in the column falls within the specified range, counts number of rows in which the length of the string is outside the specified range.
340+
- type: stringLengthCheck
341+
column: occupation
342+
minValue: 1
343+
maxValue: 5
327344
```
328345

329346
## Working with OOZIE Workflows
@@ -470,4 +487,4 @@ tables:
470487
471488
- type: nullCheck
472489
column: nullCol
473-
```
490+
```

src/main/scala/com/target/data_validator/validator/JsonDecoders.scala

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ object JsonDecoders extends LazyLogging {
1515
case "columnMaxCheck" => c.as[ColumnMaxCheck]
1616
case "rangeCheck" => RangeCheck.fromJson(c)
1717
case "uniqueCheck" => UniqueCheck.fromJson(c)
18+
case "stringLengthCheck" => StringLengthCheck.fromJson(c)
1819
case x => logger.error(s"Unknown Check `$x` in config!")
1920
throw new RuntimeException(s"Unknown Check in config `$x`")
2021
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package com.target.data_validator.validator
2+
3+
import com.target.data_validator.{JsonEncoders, ValidatorError, VarSubstitution}
4+
import com.target.data_validator.JsonUtils.debugJson
5+
import com.target.data_validator.validator.ValidatorBase._
6+
import com.typesafe.scalalogging.LazyLogging
7+
import io.circe.{DecodingFailure, HCursor, Json}
8+
import io.circe.syntax._
9+
import org.apache.spark.sql.DataFrame
10+
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
11+
import org.apache.spark.sql.catalyst.expressions._
12+
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
13+
14+
case class StringLengthCheck(
15+
column: String,
16+
minValue: Option[Json],
17+
maxValue: Option[Json]
18+
) extends RowBased {
19+
20+
override def substituteVariables(dict: VarSubstitution): ValidatorBase = {
21+
22+
val ret = StringLengthCheck(
23+
getVarSub(column, "column", dict),
24+
minValue.map(getVarSubJson(_, "minValue", dict)),
25+
maxValue.map(getVarSubJson(_, "maxValue", dict))
26+
)
27+
getEvents.foreach(ret.addEvent)
28+
ret
29+
}
30+
31+
private def cmpExpr(colExpr: Expression,
32+
value: Option[Json],
33+
cmp: (Expression, Expression) => Expression
34+
): Option[Expression] = {
35+
value.map { v => cmp(colExpr, createLiteralOrUnresolvedAttribute(IntegerType, v)) }
36+
}
37+
38+
override def colTest(schema: StructType, dict: VarSubstitution): Expression = {
39+
40+
val colExp = Length(UnresolvedAttribute(column))
41+
42+
val minValueExpression = cmpExpr(colExp, minValue, LessThan)
43+
val maxValueExpression = cmpExpr(colExp, maxValue, GreaterThan)
44+
45+
val ret = (minValueExpression, maxValueExpression) match {
46+
case (Some(x), None) => x
47+
case (None, Some(y)) => y
48+
case (Some(x), Some(y)) => Or(x, y)
49+
case _ => throw new RuntimeException("Must define min or max value.")
50+
}
51+
logger.debug(s"Expr: $ret")
52+
ret
53+
}
54+
55+
private def checkMinLessThanOrEqualToMax(values: List[Json]): Unit = {
56+
57+
if (values.forall(_.isNumber)) {
58+
values.flatMap(_.asNumber) match {
59+
case mv :: xv :: Nil if mv.toDouble > xv.toDouble =>
60+
addEvent(ValidatorError(s"min: ${minValue.get} must be less than or equal to max: ${maxValue.get}"))
61+
case _ =>
62+
}
63+
} else if (values.forall(_.isString)) {
64+
values.flatMap(_.asString) match {
65+
case mv :: xv :: Nil if mv == xv =>
66+
addEvent(ValidatorError(s"Min[String]: $mv must be less than max[String]: $xv"))
67+
case _ =>
68+
}
69+
} else {
70+
// Not Strings or Numbers
71+
addEvent(ValidatorError(s"Unsupported type in ${values.map(debugJson).mkString(", ")}"))
72+
}
73+
}
74+
75+
override def configCheck(df: DataFrame): Boolean = {
76+
77+
// Verify if at least one of min or max is specified.
78+
val values = (minValue::maxValue::Nil).flatten
79+
if (values.isEmpty) {
80+
addEvent(ValidatorError("Must define minValue or maxValue or both."))
81+
}
82+
83+
// Verify that min is less than max
84+
checkMinLessThanOrEqualToMax(values)
85+
86+
// Verify that the data type of the specified column is a String.
87+
val colType = findColumnInDataFrame(df, column)
88+
if (colType.isDefined) {
89+
val dataType = colType.get.dataType
90+
if (!(dataType.isInstanceOf[StringType])) {
91+
addEvent(ValidatorError(s"Data type of column '$column' must be String, but was found to be $dataType"))
92+
}
93+
}
94+
95+
failed
96+
}
97+
98+
override def toJson: Json = {
99+
import JsonEncoders.eventEncoder
100+
val fields = Seq(
101+
("type", Json.fromString("stringLengthCheck")),
102+
("column", Json.fromString(column))
103+
) ++
104+
minValue.map(mv => ("minValue", mv)) ++
105+
maxValue.map(mv => ("maxValue", mv)) ++
106+
Seq(
107+
("events", getEvents.asJson)
108+
)
109+
Json.obj(fields: _*)
110+
}
111+
}
112+
113+
object StringLengthCheck extends LazyLogging {
114+
def fromJson(c: HCursor): Either[DecodingFailure, ValidatorBase] = {
115+
val column = c.downField("column").as[String].right.get
116+
val minValueJ = c.downField("minValue").as[Json].right.toOption
117+
val maxValueJ = c.downField("maxValue").as[Json].right.toOption
118+
119+
logger.debug(s"column: $column")
120+
logger.debug(s"minValue: $minValueJ type: ${minValueJ.getClass.getCanonicalName}")
121+
logger.debug(s"maxValue: $maxValueJ type: ${maxValueJ.getClass.getCanonicalName}")
122+
123+
c.focus.foreach {f => logger.info(s"StringLengthCheckJson: ${f.spaces2}")}
124+
scala.util.Right(StringLengthCheck(column, minValueJ, maxValueJ))
125+
}
126+
}

0 commit comments

Comments
 (0)