Skip to content

Commit

Permalink
[SPARTA-3225] Quality rules move and environment var (#1763)
Browse files Browse the repository at this point in the history
* [SParta-NA] fix/force to string (#1735)

* Force toSTring

* Added a test rounding

* [SPARTA-NA] fix/parameters in run (#1755)
  • Loading branch information
Frannie-Ludmilla authored and compae committed Jun 14, 2019
1 parent 7df6ff0 commit b2d1bfe
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,8 @@ trait DistributedMonad[Underlying[Row]] extends SLF4JLogging with Serializable {
}

val qualityRulesFailingDataFrame = cachedRDD.map { row =>
val failingQRs = sparkRules.filter(rule => !rule.composedPredicates(row)).map(_.id)
val passingQRs = sparkRules.filterNot(x => failingQRs.contains(x.id)).map(_.id)
val failingQRs = sparkRules.filter(rule => !rule.composedPredicates(row)).map(_.id.toString)
val passingQRs = sparkRules.filterNot(x => failingQRs.contains(x.id)).map(_.id.toString)
if(failingQRs.nonEmpty)
Row.fromSeq(row.toSeq ++ Seq(passingQRs) ++ Seq(failingQRs) ++ Seq(executionId) ++ Seq(executionTime))
else Row.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,5 @@ class SparkQualityRuleThreshold(spartaQualityRuleThreshold: SpartaQualityRuleThr
}
}

private def roundTwoDecimalPositions(numberToRound: Double): Double = math.round(numberToRound * hundredDouble)/hundredDouble
private def roundTwoDecimalPositions(numberToRound: Double): Double = math.round(numberToRound * hundredDouble * hundredDouble)/hundredDouble * hundredDouble
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ class QualityRulesIT extends TemporalSparkContext with Matchers {
actionType = SpartaQualityRuleThresholdActionType(path = Some("error"), `type` = "ACT_PASS")
)

val percentageThresholdNotToRound: SpartaQualityRuleThreshold = SpartaQualityRuleThreshold(
value = 79.98,
operation = "<=",
`type` = "%",
actionType = SpartaQualityRuleThresholdActionType(path = Some("error"), `type` = "ACT_PASS")
)

def emptyFunction: (DataFrame, SaveModeEnum.Value, Map[String, String]) => Unit =
(_, _, _) => println("Fire in the hole!")

Expand Down Expand Up @@ -582,4 +589,33 @@ class QualityRulesIT extends TemporalSparkContext with Matchers {
result.head.satisfied shouldEqual true
}

it should "not round more than 2 decimals" in {

val seqQualityRules = Seq(
SpartaQualityRulePredicate(`type` = Some("aaa"), order = 1, operands = Seq("yellow"), field = "color", operation = "NOT lIKe"))

val qualityRuleLike = SpartaQualityRule(id = 1, metadataPath = "blabla1",
name = "no yellow", qualityRuleScope = "data", logicalOperator = "and",
enable = true,
threshold = percentageThresholdNotToRound,
predicates = seqQualityRules,
stepName = "tableA",
outputName = "")

val result = classTest.get.writeRDDTemplate(dataSet.get,
outputOptions,
errorManagement,
Seq.empty[OutputStep[RDD]],
Seq("input1", "transformation1"),
Seq(qualityRuleLike),
emptyFunction)

result should not be empty

result.head.numDiscardedEvents shouldEqual 1
result.head.numPassedEvents shouldEqual 4
result.head.numTotalEvents shouldEqual 5
result.head.satisfied shouldEqual false
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import akka.event.slf4j.SLF4JLogging
import com.github.mustachejava.DefaultMustacheFactory
import com.stratio.sparta.core.models.WorkflowValidationMessage
import com.stratio.sparta.serving.core.actor.ParametersListenerActor._
import com.stratio.sparta.serving.core.constants.AppConstant
import com.stratio.sparta.serving.core.factory.PostgresDaoFactory
import com.stratio.sparta.serving.core.models.SpartaSerializer
import com.stratio.sparta.serving.core.models.workflow.{Workflow, WorkflowExecutionContext, WorkflowIdExecutionContext, _}
Expand Down Expand Up @@ -109,13 +110,21 @@ class ParametersListenerActor extends Actor with SpartaSerializer with SLF4JLogg
): Future[ParametersToApplyContext] = {
for {
workflowParamLists <- getVariablesFromParamLists(workflow.settings.global.parametersLists)
parentsParamLists <- getParentsFromParamLists(workflow.settings.global.parametersLists)
paramListVariables <- getVariablesFromParamLists(executionContext.paramsLists)
globalParametersVariables <- getVariablesFromGlobalParameters
} yield {
val parametersToApply = globalParametersVariables ++ workflowParamLists ++
paramListVariables ++ executionContext.toParametersMap
val parametersWithoutValue = workflow.settings.global.parametersUsed.filterNot(parameter =>
parametersToApply.contains(parameter))
val parametersFromExecutionContext = executionContext.toParametersMap
val parentsParametersAndGlobal = parentsParamLists :+ "Global"
val executionParametersAddPrefixes: Map[String, String] = for {
(paramName, paramValue) <- parametersFromExecutionContext.filterKeys(!_.contains("."))
parentParamList <- parentsParametersAndGlobal
} yield (s"$parentParamList.$paramName", paramValue)

val parametersToApply = globalParametersVariables ++ workflowParamLists ++
paramListVariables ++ parametersFromExecutionContext ++ executionParametersAddPrefixes
val parametersWithoutValue = workflow.settings.global.parametersUsed.filterNot(parameter =>
parametersToApply.contains(parameter))

ParametersToApplyContext(parametersToApply, parametersWithoutValue)
}
Expand Down Expand Up @@ -227,6 +236,14 @@ class ParametersListenerActor extends Actor with SpartaSerializer with SLF4JLogg
}
}

def getParentsFromParamLists(paramsLists: Seq[String]): Future[Seq[String]] = {
Future.sequence {
paramsLists.map(paramList => parameterListService.findByName(paramList))
}.map { paramsLists =>
paramsLists.map(paramList => paramList.parent.getOrElse(paramList.name))
}
}

def getVariablesFromGlobalParameters: Future[Map[String, String]] =
for {
globalParameters <- globalParametersService.find()
Expand Down
1 change: 0 additions & 1 deletion serving-core/src/test/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ akka.persistence.journal.plugin = "inmemory-journal"
akka.persistence.snapshot-store.plugin = "inmemory-snapshot-store"
akka.persistence.journal.leveldb.native = off


# CONFIG

# Driver jar served by Sparta in this location
Expand Down

0 comments on commit b2d1bfe

Please sign in to comment.