Skip to content

Commit

Permalink
Allow all DQ constraints to be generated from an Analyzer (#508)
Browse files Browse the repository at this point in the history
Co-authored-by: Yannis Mentekidis <mentekid@amazon.com>
  • Loading branch information
mentekid and yannis-mentekidis authored Oct 9, 2023
1 parent 4ce922f commit ca034c3
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 7 deletions.
6 changes: 6 additions & 0 deletions src/main/scala/com/amazon/deequ/checks/Check.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1083,6 +1083,12 @@ object Check {
/** A common assertion function checking if the value is 1 */
val IsOne: Double => Boolean = { _ == 1.0 }

def fromConstraint(constraint: Constraint,
description: String,
checkLevel: CheckLevel.Value = CheckLevel.Error): Check = {
Check(checkLevel, description, constraints = Seq(constraint))
}

/**
* Common assertion function checking if the value can be considered as normal (that no
* anomalies were detected), given the anomaly detection strategy and details on how to retrieve
Expand Down
121 changes: 114 additions & 7 deletions src/main/scala/com/amazon/deequ/constraints/Constraint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ object Constraint {

val size = Size(where)

fromAnalyzer(size, assertion, hint)
}

def fromAnalyzer(size: Size, assertion: Long => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[NumMatches, Double, Long](size,
assertion, Some(_.toLong), hint)

Expand Down Expand Up @@ -194,10 +198,16 @@ object Constraint {

val completeness = Completeness(column, where)

this.fromAnalyzer(completeness, assertion, hint)
}

def fromAnalyzer(completeness: Completeness,
assertion: Double => Boolean,
hint: Option[String] = None): Constraint = {
val constraint = AnalysisBasedConstraint[NumMatchesAndCount, Double, Double](
completeness, assertion, hint = hint)

new RowLevelConstraint(constraint, s"CompletenessConstraint($completeness)", s"Completeness-$column")
new RowLevelConstraint(constraint, s"CompletenessConstraint($completeness)", s"Completeness-${completeness.column}")
}

/**
Expand Down Expand Up @@ -238,12 +248,16 @@ object Constraint {

val uniqueness = Uniqueness(columns, where)

fromAnalyzer(uniqueness, assertion, hint)
}

def fromAnalyzer(uniqueness: Uniqueness, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[FrequenciesAndNumRows, Double, Double](
uniqueness, assertion, hint = hint)

new RowLevelGroupedConstraint(constraint,
s"UniquenessConstraint($uniqueness)",
columns)
uniqueness.columns)
}

/**
Expand All @@ -264,6 +278,10 @@ object Constraint {

val distinctness = Distinctness(columns, where)

fromAnalyzer(distinctness, assertion, hint)
}

def fromAnalyzer(distinctness: Distinctness, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[FrequenciesAndNumRows, Double, Double](
distinctness, assertion, hint = hint)

Expand All @@ -287,12 +305,20 @@ object Constraint {
: Constraint = {

val uniqueValueRatio = UniqueValueRatio(columns, where)
fromAnalyzer(uniqueValueRatio, assertion, hint)
}

def fromAnalyzer(
uniqueValueRatio: UniqueValueRatio,
assertion: Double => Boolean,
hint: Option[String])
: Constraint = {
val constraint = AnalysisBasedConstraint[FrequenciesAndNumRows, Double, Double](
uniqueValueRatio, assertion, hint = hint)

new RowLevelGroupedConstraint(constraint,
s"UniqueValueRatioConstraint($uniqueValueRatio",
columns)
uniqueValueRatio.columns)
}

/**
Expand All @@ -314,15 +340,18 @@ object Constraint {

val compliance = Compliance(name, column, where, columns)

fromAnalyzer(compliance, assertion, hint)
}

private def fromAnalyzer(compliance: Compliance, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[NumMatchesAndCount, Double, Double](
compliance, assertion, hint = hint)

// new NamedConstraint(constraint, s"ComplianceConstraint($compliance)")
val sparkAssertion = org.apache.spark.sql.functions.udf(assertion)
new RowLevelAssertedConstraint(
constraint,
s"ComplianceConstraint($compliance)",
s"ColumnsCompliance-$column",
s"ColumnsCompliance-${compliance.predicate}",
sparkAssertion)
}

Expand All @@ -346,6 +375,16 @@ object Constraint {

val patternMatch = PatternMatch(column, pattern, where)

fromAnalyzer(patternMatch, pattern, assertion, name, hint)
}

def fromAnalyzer(
patternMatch: PatternMatch,
pattern: Regex,
assertion: Double => Boolean,
name: Option[String],
hint: Option[String]): Constraint = {
val column: String = patternMatch.column
val constraint = AnalysisBasedConstraint[NumMatchesAndCount, Double, Double](
patternMatch, assertion, hint = hint)

Expand Down Expand Up @@ -375,6 +414,10 @@ object Constraint {

val entropy = Entropy(column, where)

fromAnalyzer(entropy, assertion, hint)
}

def fromAnalyzer(entropy: Entropy, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[FrequenciesAndNumRows, Double, Double](
entropy, assertion, hint = hint)

Expand All @@ -401,6 +444,14 @@ object Constraint {

val mutualInformation = MutualInformation(Seq(columnA, columnB), where)

fromAnalyzer(mutualInformation, assertion, hint)
}

def fromAnalyzer(
mutualInformation: MutualInformation,
assertion: Double => Boolean,
hint: Option[String])
: Constraint = {
val constraint = AnalysisBasedConstraint[FrequenciesAndNumRows, Double, Double](
mutualInformation, assertion, hint = hint)

Expand All @@ -427,6 +478,10 @@ object Constraint {

val approxQuantile = ApproxQuantile(column, quantile, where = where)

fromAnalyzer(approxQuantile, assertion, hint)
}

def fromAnalyzer(approxQuantile: ApproxQuantile, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[ApproxQuantileState, Double, Double](
approxQuantile, assertion, hint = hint)

Expand All @@ -450,6 +505,11 @@ object Constraint {

val maxLength = MaxLength(column, where, analyzerOptions)

fromAnalyzer(maxLength, assertion, hint)
}

def fromAnalyzer(maxLength: MaxLength, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val column: String = maxLength.column
val constraint = AnalysisBasedConstraint[MaxState, Double, Double](maxLength, assertion,
hint = hint)

Expand Down Expand Up @@ -479,6 +539,11 @@ object Constraint {

val minLength = MinLength(column, where, analyzerOptions)

fromAnalyzer(minLength, assertion, hint)
}

def fromAnalyzer(minLength: MinLength, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val column: String = minLength.column
val constraint = AnalysisBasedConstraint[MinState, Double, Double](minLength, assertion,
hint = hint)

Expand Down Expand Up @@ -508,6 +573,11 @@ object Constraint {

val minimum = Minimum(column, where)

fromAnalyzer(minimum, assertion, hint)
}

def fromAnalyzer(minimum: Minimum, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val column: String = minimum.column
val constraint = AnalysisBasedConstraint[MinState, Double, Double](minimum, assertion,
hint = hint)

Expand Down Expand Up @@ -535,6 +605,11 @@ object Constraint {

val maximum = Maximum(column, where)

fromAnalyzer(maximum, assertion, hint)
}

def fromAnalyzer(maximum: Maximum, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val column: String = maximum.column
val constraint = AnalysisBasedConstraint[MaxState, Double, Double](maximum, assertion,
hint = hint)

Expand Down Expand Up @@ -562,11 +637,16 @@ object Constraint {

val mean = Mean(column, where)

fromAnalyzer(mean, assertion, hint)
}

def fromAnalyzer(mean: Mean, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[MeanState, Double, Double](mean, assertion,
hint = hint)

new NamedConstraint(constraint, s"MeanConstraint($mean)")
}

/**
* Runs sum analysis on the given column and executes the assertion
*
Expand All @@ -583,13 +663,16 @@ object Constraint {

val sum = Sum(column, where)

fromAnalyzer(sum, assertion, hint)
}

def fromAnalyzer(sum: Sum, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[SumState, Double, Double](sum, assertion,
hint = hint)

new NamedConstraint(constraint, s"SumConstraint($sum)")
}


/**
* Runs standard deviation analysis on the given column and executes the assertion
*
Expand All @@ -606,6 +689,14 @@ object Constraint {

val standardDeviation = StandardDeviation(column, where)

fromAnalyzer(standardDeviation, assertion, hint)
}

def fromAnalyzer(
standardDeviation: StandardDeviation,
assertion: Double => Boolean,
hint: Option[String])
: Constraint = {
val constraint = AnalysisBasedConstraint[StandardDeviationState, Double, Double](
standardDeviation, assertion, hint = hint)

Expand All @@ -628,6 +719,14 @@ object Constraint {

val approxCountDistinct = ApproxCountDistinct(column, where)

fromAnalyzer(approxCountDistinct, assertion, hint)
}

def fromAnalyzer(
approxCountDistinct: ApproxCountDistinct,
assertion: Double => Boolean,
hint: Option[String])
: Constraint = {
val constraint = AnalysisBasedConstraint[ApproxCountDistinctState, Double, Double](
approxCountDistinct, assertion, hint = hint)

Expand All @@ -652,6 +751,10 @@ object Constraint {

val correlation = Correlation(columnA, columnB, where)

fromAnalyzer(correlation, assertion, hint)
}

def fromAnalyzer(correlation: Correlation, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[CorrelationState, Double, Double](
correlation, assertion, hint = hint)

Expand Down Expand Up @@ -711,7 +814,11 @@ object Constraint {

val kllSketch = KLLSketch(column, kllParameters = kllParameters)

val constraint = AnalysisBasedConstraint[KLLState, BucketDistribution, BucketDistribution] (
fromAnalyzer(kllSketch, assertion, hint)
}

def fromAnalyzer(kllSketch: KLLSketch, assertion: BucketDistribution => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[KLLState, BucketDistribution, BucketDistribution](
kllSketch, assertion, hint = hint)

new NamedConstraint(constraint, s"kllSketchConstraint($kllSketch)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@
package com.amazon.deequ.constraints

import com.amazon.deequ.SparkContextSpec
import com.amazon.deequ.VerificationSuite
import com.amazon.deequ.analyzers._
import com.amazon.deequ.analyzers.runners.MetricCalculationException
import com.amazon.deequ.checks.Check
import com.amazon.deequ.checks.CheckLevel
import com.amazon.deequ.checks.CheckResult
import com.amazon.deequ.checks.CheckStatus
import com.amazon.deequ.constraints.ConstraintUtils.calculate
import com.amazon.deequ.metrics.{DoubleMetric, Entity, Metric}
import com.amazon.deequ.utils.FixtureSupport
Expand Down Expand Up @@ -74,6 +79,24 @@ class AnalysisBasedConstraintTest extends WordSpec with Matchers with SparkConte

"Analysis based constraint" should {

"can convert an analyzer to a Check" in
withSparkSession { sparkSession =>
val completeness = Completeness("att1")
val constraint1 = Constraint.fromAnalyzer(completeness, d => d > 1)
val size: Size = Size()
val sizeAssertion: (Long => Boolean) = d => d > 0
val constraint2 = Constraint.fromAnalyzer(size, sizeAssertion, None)

val check1 = Check.fromConstraint(constraint1, s"Completeness att1")
val check2 = Check.fromConstraint(constraint2, "Size")

val df = getDfMissing(sparkSession)

val result = new VerificationSuite().onData(df).addCheck(check1).run()

assert(result.status == CheckStatus.Error)
}

"assert correctly on values if analysis is successful" in
withSparkSession { sparkSession =>
val df = getDfMissing(sparkSession)
Expand Down

0 comments on commit ca034c3

Please sign in to comment.