diff --git a/src/main/scala/com/amazon/deequ/checks/Check.scala b/src/main/scala/com/amazon/deequ/checks/Check.scala index 2996b6929..66879334f 100644 --- a/src/main/scala/com/amazon/deequ/checks/Check.scala +++ b/src/main/scala/com/amazon/deequ/checks/Check.scala @@ -1083,6 +1083,12 @@ object Check { /** A common assertion function checking if the value is 1 */ val IsOne: Double => Boolean = { _ == 1.0 } + def fromConstraint(constraint: Constraint, + description: String, + checkLevel: CheckLevel.Value = CheckLevel.Error): Check = { + Check(checkLevel, description, constraints = Seq(constraint)) + } + /** * Common assertion function checking if the value can be considered as normal (that no * anomalies were detected), given the anomaly detection strategy and details on how to retrieve diff --git a/src/main/scala/com/amazon/deequ/constraints/Constraint.scala b/src/main/scala/com/amazon/deequ/constraints/Constraint.scala index bbfdbd813..8bd83990e 100644 --- a/src/main/scala/com/amazon/deequ/constraints/Constraint.scala +++ b/src/main/scala/com/amazon/deequ/constraints/Constraint.scala @@ -114,6 +114,10 @@ object Constraint { val size = Size(where) + fromAnalyzer(size, assertion, hint) + } + + def fromAnalyzer(size: Size, assertion: Long => Boolean, hint: Option[String]): Constraint = { val constraint = AnalysisBasedConstraint[NumMatches, Double, Long](size, assertion, Some(_.toLong), hint) @@ -194,10 +198,16 @@ object Constraint { val completeness = Completeness(column, where) + this.fromAnalyzer(completeness, assertion, hint) + } + + def fromAnalyzer(completeness: Completeness, + assertion: Double => Boolean, + hint: Option[String] = None): Constraint = { val constraint = AnalysisBasedConstraint[NumMatchesAndCount, Double, Double]( completeness, assertion, hint = hint) - new RowLevelConstraint(constraint, s"CompletenessConstraint($completeness)", s"Completeness-$column") + new RowLevelConstraint(constraint, s"CompletenessConstraint($completeness)", s"Completeness-${completeness.column}") } /** @@ -238,12 +248,16 @@ object Constraint { val uniqueness = Uniqueness(columns, where) + fromAnalyzer(uniqueness, assertion, hint) + } + + def fromAnalyzer(uniqueness: Uniqueness, assertion: Double => Boolean, hint: Option[String]): Constraint = { val constraint = AnalysisBasedConstraint[FrequenciesAndNumRows, Double, Double]( uniqueness, assertion, hint = hint) new RowLevelGroupedConstraint(constraint, s"UniquenessConstraint($uniqueness)", - columns) + uniqueness.columns) } /** @@ -264,6 +278,10 @@ object Constraint { val distinctness = Distinctness(columns, where) + fromAnalyzer(distinctness, assertion, hint) + } + + def fromAnalyzer(distinctness: Distinctness, assertion: Double => Boolean, hint: Option[String]): Constraint = { val constraint = AnalysisBasedConstraint[FrequenciesAndNumRows, Double, Double]( distinctness, assertion, hint = hint) @@ -287,12 +305,20 @@ object Constraint { : Constraint = { val uniqueValueRatio = UniqueValueRatio(columns, where) + fromAnalyzer(uniqueValueRatio, assertion, hint) + } + + def fromAnalyzer( + uniqueValueRatio: UniqueValueRatio, + assertion: Double => Boolean, + hint: Option[String]) + : Constraint = { val constraint = AnalysisBasedConstraint[FrequenciesAndNumRows, Double, Double]( uniqueValueRatio, assertion, hint = hint) new RowLevelGroupedConstraint(constraint, s"UniqueValueRatioConstraint($uniqueValueRatio", - columns) + uniqueValueRatio.columns) } /** @@ -314,15 +340,18 @@ object Constraint { val compliance = Compliance(name, column, where, columns) + fromAnalyzer(compliance, assertion, hint) + } + + private def fromAnalyzer(compliance: Compliance, assertion: Double => Boolean, hint: Option[String]): Constraint = { val constraint = AnalysisBasedConstraint[NumMatchesAndCount, Double, Double]( compliance, assertion, hint = hint) -// new NamedConstraint(constraint, s"ComplianceConstraint($compliance)") val sparkAssertion = org.apache.spark.sql.functions.udf(assertion) new RowLevelAssertedConstraint( constraint, s"ComplianceConstraint($compliance)", - s"ColumnsCompliance-$column", + s"ColumnsCompliance-${compliance.predicate}", sparkAssertion) } @@ -346,6 +375,16 @@ object Constraint { val patternMatch = PatternMatch(column, pattern, where) + fromAnalyzer(patternMatch, pattern, assertion, name, hint) + } + + def fromAnalyzer( + patternMatch: PatternMatch, + pattern: Regex, + assertion: Double => Boolean, + name: Option[String], + hint: Option[String]): Constraint = { + val column: String = patternMatch.column val constraint = AnalysisBasedConstraint[NumMatchesAndCount, Double, Double]( patternMatch, assertion, hint = hint) @@ -375,6 +414,10 @@ object Constraint { val entropy = Entropy(column, where) + fromAnalyzer(entropy, assertion, hint) + } + + def fromAnalyzer(entropy: Entropy, assertion: Double => Boolean, hint: Option[String]): Constraint = { val constraint = AnalysisBasedConstraint[FrequenciesAndNumRows, Double, Double]( entropy, assertion, hint = hint) @@ -401,6 +444,14 @@ object Constraint { val mutualInformation = MutualInformation(Seq(columnA, columnB), where) + fromAnalyzer(mutualInformation, assertion, hint) + } + + def fromAnalyzer( + mutualInformation: MutualInformation, + assertion: Double => Boolean, + hint: Option[String]) + : Constraint = { val constraint = AnalysisBasedConstraint[FrequenciesAndNumRows, Double, Double]( mutualInformation, assertion, hint = hint) @@ -427,6 +478,10 @@ object Constraint { val approxQuantile = ApproxQuantile(column, quantile, where = where) + fromAnalyzer(approxQuantile, assertion, hint) + } + + def fromAnalyzer(approxQuantile: ApproxQuantile, assertion: Double => Boolean, hint: Option[String]): Constraint = { val constraint = AnalysisBasedConstraint[ApproxQuantileState, Double, Double]( approxQuantile, assertion, hint = hint) @@ -450,6 +505,11 @@ object Constraint { val maxLength = MaxLength(column, where, analyzerOptions) + fromAnalyzer(maxLength, assertion, hint) + } + + def fromAnalyzer(maxLength: MaxLength, assertion: Double => Boolean, hint: Option[String]): Constraint = { + val column: String = maxLength.column val constraint = AnalysisBasedConstraint[MaxState, Double, Double](maxLength, assertion, hint = hint) @@ -479,6 +539,11 @@ object Constraint { val minLength = MinLength(column, where, analyzerOptions) + fromAnalyzer(minLength, assertion, hint) + } + + def fromAnalyzer(minLength: MinLength, assertion: Double => Boolean, hint: Option[String]): Constraint = { + val column: String = minLength.column val constraint = AnalysisBasedConstraint[MinState, Double, Double](minLength, assertion, hint = hint) @@ -508,6 +573,11 @@ object Constraint { val minimum = Minimum(column, where) + fromAnalyzer(minimum, assertion, hint) + } + + def fromAnalyzer(minimum: Minimum, assertion: Double => Boolean, hint: Option[String]): Constraint = { + val column: String = minimum.column val constraint = AnalysisBasedConstraint[MinState, Double, Double](minimum, assertion, hint = hint) @@ -535,6 +605,11 @@ object Constraint { val maximum = Maximum(column, where) + fromAnalyzer(maximum, assertion, hint) + } + + def fromAnalyzer(maximum: Maximum, assertion: Double => Boolean, hint: Option[String]): Constraint = { + val column: String = maximum.column val constraint = AnalysisBasedConstraint[MaxState, Double, Double](maximum, assertion, hint = hint) @@ -562,11 +637,16 @@ object Constraint { val mean = Mean(column, where) + fromAnalyzer(mean, assertion, hint) + } + + def fromAnalyzer(mean: Mean, assertion: Double => Boolean, hint: Option[String]): Constraint = { val constraint = AnalysisBasedConstraint[MeanState, Double, Double](mean, assertion, hint = hint) new NamedConstraint(constraint, s"MeanConstraint($mean)") } + /** * Runs sum analysis on the given column and executes the assertion * @@ -583,13 +663,16 @@ object Constraint { val sum = Sum(column, where) + fromAnalyzer(sum, assertion, hint) + } + + def fromAnalyzer(sum: Sum, assertion: Double => Boolean, hint: Option[String]): Constraint = { val constraint = AnalysisBasedConstraint[SumState, Double, Double](sum, assertion, hint = hint) new NamedConstraint(constraint, s"SumConstraint($sum)") } - /** * Runs standard deviation analysis on the given column and executes the assertion * @@ -606,6 +689,14 @@ object Constraint { val standardDeviation = StandardDeviation(column, where) + fromAnalyzer(standardDeviation, assertion, hint) + } + + def fromAnalyzer( + standardDeviation: StandardDeviation, + assertion: Double => Boolean, + hint: Option[String]) + : Constraint = { val constraint = AnalysisBasedConstraint[StandardDeviationState, Double, Double]( standardDeviation, assertion, hint = hint) @@ -628,6 +719,14 @@ object Constraint { val approxCountDistinct = ApproxCountDistinct(column, where) + fromAnalyzer(approxCountDistinct, assertion, hint) + } + + def fromAnalyzer( + approxCountDistinct: ApproxCountDistinct, + assertion: Double => Boolean, + hint: Option[String]) + : Constraint = { val constraint = AnalysisBasedConstraint[ApproxCountDistinctState, Double, Double]( approxCountDistinct, assertion, hint = hint) @@ -652,6 +751,10 @@ object Constraint { val correlation = Correlation(columnA, columnB, where) + fromAnalyzer(correlation, assertion, hint) + } + + def fromAnalyzer(correlation: Correlation, assertion: Double => Boolean, hint: Option[String]): Constraint = { val constraint = AnalysisBasedConstraint[CorrelationState, Double, Double]( correlation, assertion, hint = hint) @@ -711,7 +814,11 @@ object Constraint { val kllSketch = KLLSketch(column, kllParameters = kllParameters) - val constraint = AnalysisBasedConstraint[KLLState, BucketDistribution, BucketDistribution] ( + fromAnalyzer(kllSketch, assertion, hint) + } + + def fromAnalyzer(kllSketch: KLLSketch, assertion: BucketDistribution => Boolean, hint: Option[String]): Constraint = { + val constraint = AnalysisBasedConstraint[KLLState, BucketDistribution, BucketDistribution]( kllSketch, assertion, hint = hint) new NamedConstraint(constraint, s"kllSketchConstraint($kllSketch)") diff --git a/src/test/scala/com/amazon/deequ/constraints/AnalysisBasedConstraintTest.scala b/src/test/scala/com/amazon/deequ/constraints/AnalysisBasedConstraintTest.scala index b7657ef29..f8188165c 100644 --- a/src/test/scala/com/amazon/deequ/constraints/AnalysisBasedConstraintTest.scala +++ b/src/test/scala/com/amazon/deequ/constraints/AnalysisBasedConstraintTest.scala @@ -17,8 +17,13 @@ package com.amazon.deequ.constraints import com.amazon.deequ.SparkContextSpec +import com.amazon.deequ.VerificationSuite import com.amazon.deequ.analyzers._ import com.amazon.deequ.analyzers.runners.MetricCalculationException +import com.amazon.deequ.checks.Check +import com.amazon.deequ.checks.CheckLevel +import com.amazon.deequ.checks.CheckResult +import com.amazon.deequ.checks.CheckStatus import com.amazon.deequ.constraints.ConstraintUtils.calculate import com.amazon.deequ.metrics.{DoubleMetric, Entity, Metric} import com.amazon.deequ.utils.FixtureSupport @@ -74,6 +79,24 @@ class AnalysisBasedConstraintTest extends WordSpec with Matchers with SparkConte "Analysis based constraint" should { + "can convert an analyzer to a Check" in + withSparkSession { sparkSession => + val completeness = Completeness("att1") + val constraint1 = Constraint.fromAnalyzer(completeness, d => d > 1) + val size: Size = Size() + val sizeAssertion: (Long => Boolean) = d => d > 0 + val constraint2 = Constraint.fromAnalyzer(size, sizeAssertion, None) + + val check1 = Check.fromConstraint(constraint1, s"Completeness att1") + val check2 = Check.fromConstraint(constraint2, "Size") + + val df = getDfMissing(sparkSession) + + val result = new VerificationSuite().onData(df).addCheck(check1).run() + + assert(result.status == CheckStatus.Error) + } + "assert correctly on values if analysis is successful" in withSparkSession { sparkSession => val df = getDfMissing(sparkSession)