Skip to content

Commit 102cfb9

Browse files
authored
Adds support for UniqueCheck (#11)
* Adds support for UniqueCheck * Changes from code review feedback. * Updates package and some imports.
1 parent b60e5a0 commit 102cfb9

File tree

11 files changed

+252
-16
lines changed

11 files changed

+252
-16
lines changed

README.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ To validate an `.orc` file, specify `orcFile` and the path to the file, see belo
200200

201201
### Validators
202202

203-
The third section are the validators. Currently 5 validators are supported `columnMaxCheck`, `negativeCheck`, `nullCheck`, `rangeCheck` and `rowCount`. To specify a validator, you first specify the `type` as one of the validators, then specify the arguments for that validator.
203+
The third section are the validators. To specify a validator, you first specify the type as one of the validators, then specify the arguments for that validator. Currently supported validators are listed below:
204204

205205
#### `columnMaxCheck`
206206

@@ -250,6 +250,15 @@ The minimum number of rows a table must have to pass the validator.
250250

251251
See Example Config file below to see how the checks are configured.
252252

253+
#### `uniqueCheck`
254+
255+
This check is used to make sure all rows in the table are unique, only the columns specified are used to determine uniqueness.
256+
This is a costly check and requires an additional pass through the table.
257+
258+
| Arg | Type | Description |
259+
|-----|------|-------------|
260+
| `columns` | Array[String] | Each set of values in these columns must be unique.
261+
253262
## Example Config
254263

255264
```yaml

src/main/scala/com/target/data_validator/Main.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,10 @@ object Main extends LazyLogging with EventLog {
7575
mainConfig: CmdLineOptions,
7676
config: ValidatorConfig,
7777
varSub: VarSubstitution
78-
): Boolean = config.quickChecks(spark, varSub)
78+
): Boolean = {
79+
logger.info("Running sparkChecks")
80+
Seq(config.quickChecks(spark, varSub), config.costlyChecks(spark, varSub)).exists(x => x)
81+
}
7982

8083
/*
8184
* There are 2 types of errors we return (fatal, validator_status)

src/main/scala/com/target/data_validator/ValidatorConfig.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,15 @@ case class ValidatorConfig(
4747
}
4848

4949
def quickChecks(session: SparkSession, dict: VarSubstitution): Boolean = {
50+
logger.info("Running Quick Checks...")
5051
tables.map(_.quickChecks(session, dict)(this)).exists(x => x)
5152
}
5253

54+
def costlyChecks(session: SparkSession, dict: VarSubstitution): Boolean = {
55+
logger.info("Running Costly Checks...")
56+
tables.map(_.costlyChecks(session, dict)(this)).exists(x => x)
57+
}
58+
5359
def generateHTMLReport(): Tag = html(h1("Validator Report"), hr(), tables.map(_.generateHTMLReport()))
5460

5561
def substituteVariables(varSub: VarSubstitution): Option[ValidatorConfig] = {

src/main/scala/com/target/data_validator/ValidatorTable.scala

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.target.data_validator
22

3-
import com.target.data_validator.validator.{ColumnBased, RowBased, ValidatorBase}
3+
import com.target.data_validator.validator.{CheapCheck, ColumnBased, CostlyCheck, RowBased, ValidatorBase}
44
import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
55
import org.apache.spark.sql.catalyst.expressions.{Alias, Expression}
66
import org.apache.spark.sql.catalyst.expressions.aggregate.{Count, Sum}
@@ -75,9 +75,13 @@ abstract class ValidatorTable(
7575

7676
def quickChecks(session: SparkSession, dict: VarSubstitution)(implicit vc: ValidatorConfig): Boolean = {
7777
val dataFrame = open(session).get
78-
val checkSelects: Seq[Expression] = checks.map {
78+
val qc: List[CheapCheck] = checks.flatMap {
79+
case cc: CheapCheck => Some(cc)
80+
case _ => None
81+
}
82+
val checkSelects: Seq[Expression] = qc.map {
7983
case colChk: ColumnBased => colChk.select(dataFrame.schema, dict)
80-
case chk => Sum(chk.select(dataFrame.schema, dict)).toAggregateExpression()
84+
case chk: RowBased => Sum(chk.select(dataFrame.schema, dict)).toAggregateExpression()
8185
}
8286

8387
val cols: Seq[Column] = createCountSelect() ++ checkSelects.zipWithIndex.map {
@@ -98,8 +102,8 @@ abstract class ValidatorTable(
98102
logger.info(s"Total Rows Processed: $count")
99103
addEvent(ValidatorCounter(s"RowCount for $label", count))
100104

101-
val failed = checks.zipWithIndex.map {
102-
case (check: ValidatorBase, idx: Int) => check.quickCheck(results, count, idx + 1)
105+
val failed = qc.zipWithIndex.map {
106+
case (check: CheapCheck, idx: Int) => check.quickCheck(results, count, idx + 1)
103107
}.exists(x => x)
104108

105109
if (failed) {
@@ -112,6 +116,15 @@ abstract class ValidatorTable(
112116
failed
113117
}
114118

119+
def costlyChecks(session: SparkSession, dict: VarSubstitution)(implicit vc: ValidatorConfig): Boolean = {
120+
val df = open(session).get
121+
val cc = checks.flatMap {
122+
case cc: CostlyCheck => Some(cc)
123+
case _ => None
124+
}
125+
cc.map(_.costlyCheck(df)).exists(x => x)
126+
}
127+
115128
def quickErrorDetails(dataFrame: DataFrame, dict: VarSubstitution)(implicit vc: ValidatorConfig): Unit = {
116129
val keySelect = createKeySelect(dataFrame)
117130
val failedChecksWithIndex = checks

src/main/scala/com/target/data_validator/validator/ColumnBased.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression
1010
import org.apache.spark.sql.catalyst.expressions.aggregate.Max
1111
import org.apache.spark.sql.types._
1212

13-
abstract class ColumnBased(column: String, condTest: Expression) extends ValidatorBase {
13+
abstract class ColumnBased(column: String, condTest: Expression) extends CheapCheck {
1414
override def select(schema: StructType, dict: VarSubstitution): Expression = condTest
1515

1616
// ColumnBased checks don't have per row error details.

src/main/scala/com/target/data_validator/validator/JsonDecoders.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ object JsonDecoders extends LazyLogging {
1414
case "negativeCheck" => c.as[NegativeCheck]
1515
case "columnMaxCheck" => c.as[ColumnMaxCheck]
1616
case "rangeCheck" => RangeCheck.fromJson(c)
17+
case "uniqueCheck" => UniqueCheck.fromJson(c)
1718
case x => logger.error(s"Unknown Check `$x` in config!")
1819
throw new RuntimeException(s"Unknown Check in config `$x`")
1920
}

src/main/scala/com/target/data_validator/validator/RowBased.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
1010
import org.apache.spark.sql.catalyst.expressions._
1111
import org.apache.spark.sql.types.{NumericType, StructType}
1212

13-
abstract class RowBased extends ValidatorBase {
13+
abstract class RowBased extends CheapCheck {
1414

1515
val column: String
1616

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package com.target.data_validator.validator
2+
3+
import com.target.data_validator.{ValidatorError, ValidatorGood, ValidatorTimer, VarSubstitution}
4+
import com.typesafe.scalalogging.LazyLogging
5+
import io.circe.{DecodingFailure, HCursor, Json}
6+
import io.circe.syntax._
7+
import org.apache.spark.sql.{Column, DataFrame}
8+
9+
case class UniqueCheck(columns: Seq[String]) extends CostlyCheck {
10+
11+
override def substituteVariables(dict: VarSubstitution): ValidatorBase = {
12+
val newColumns = columns.map(getVarSub(_, "columns", dict))
13+
val ret = UniqueCheck(newColumns)
14+
this.getEvents.foreach(ret.addEvent)
15+
ret
16+
}
17+
18+
override def configCheck(df: DataFrame): Boolean = {
19+
columns.exists(findColumnInDataFrame(df, _).isEmpty)
20+
}
21+
22+
override def toJson: Json = {
23+
import com.target.data_validator.JsonEncoders.eventEncoder
24+
val fields = Seq(
25+
("type", Json.fromString("uniqueCheck")),
26+
("columns", Json.fromValues(columns.map(Json.fromString))),
27+
("failed", Json.fromBoolean(failed)),
28+
("events", this.getEvents.asJson))
29+
30+
Json.fromFields(fields)
31+
}
32+
33+
override def costlyCheck(df: DataFrame): Boolean = {
34+
val cols = columns.map(new Column(_))
35+
val timer = new ValidatorTimer(s"UniqueCheck($columns)")
36+
addEvent(timer)
37+
// Note: this computes the count of the number of distinct keys (if you will) that have at least one duplicated row.
38+
// It's not number of duplicated rows.
39+
val ret = timer.time(df.select(cols: _*).groupBy(cols: _*).count().where("count > 1").count())
40+
logger.info(s"costlyCheck: cols:$cols ret:$ret")
41+
if (ret > 0) {
42+
addEvent(ValidatorError(s"$ret duplicates found!"))
43+
} else {
44+
addEvent(ValidatorGood("no duplicates found."))
45+
}
46+
47+
failed
48+
}
49+
}
50+
51+
object UniqueCheck extends LazyLogging {
52+
53+
def fromJson(c: HCursor): Either[DecodingFailure, ValidatorBase] = {
54+
val columns = c.downField("columns").as[Seq[String]]
55+
columns.right.map(UniqueCheck(_))
56+
}
57+
58+
}

src/main/scala/com/target/data_validator/validator/ValidatorBase.scala

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,6 @@ abstract class ValidatorBase(
2323

2424
def configCheck(df: DataFrame): Boolean
2525

26-
def select(schema: StructType, dict: VarSubstitution): Expression
27-
28-
def quickCheck(r: Row, count: Long, idx: Int): Boolean
29-
3026
def generateHTMLReport: Tag = {
3127
val d = div(cls := "check_report")
3228
if (failed) {
@@ -64,7 +60,7 @@ abstract class ValidatorBase(
6460
addEvent(ValidatorError(msg))
6561
}
6662
} else {
67-
val msg = s"Column: $column not found in table."
63+
val msg = s"Column: '$column' not found in table."
6864
logger.error(msg)
6965
addEvent(ValidatorError(msg))
7066
}
@@ -74,7 +70,9 @@ abstract class ValidatorBase(
7470
private[validator] def findColumnInDataFrame(dataFrame: DataFrame, column: String): Option[StructField] = {
7571
val ret = dataFrame.schema.fields.find(_.name == column)
7672
if (ret.isEmpty) {
77-
addEvent(ValidatorError(s"Column: $column not found in schema."))
73+
val msg = s"Column: '$column' not found in schema."
74+
logger.error(msg)
75+
addEvent(ValidatorError(msg))
7876
}
7977
ret
8078
}
@@ -244,3 +242,19 @@ object ValidatorBase extends LazyLogging {
244242
ret
245243
}
246244
}
245+
246+
/**
247+
* CheapChecks are checks that can be combined into the same pass through a table.
248+
*/
249+
trait CheapCheck extends ValidatorBase {
250+
def select(schema: StructType, dict: VarSubstitution): Expression
251+
252+
def quickCheck(r: Row, count: Long, idx: Int): Boolean
253+
}
254+
255+
/**
256+
* CostlyChecks are checks that require their own pass through the table and therefore are most costly.
257+
*/
258+
trait CostlyCheck extends ValidatorBase {
259+
def costlyCheck(df: DataFrame): Boolean
260+
}

src/test/scala/com/target/data_validator/validator/RangeCheckSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class RangeCheckSpec extends FunSpec with Matchers with TestingSparkSession {
6464
None
6565
)
6666
assert(sut.configCheck(df))
67-
assert(sut.getEvents contains ValidatorError("Column: bad_column_name not found in schema."))
67+
assert(sut.getEvents contains ValidatorError("Column: 'bad_column_name' not found in schema."))
6868
assert(sut.failed)
6969
}
7070

0 commit comments

Comments
 (0)