Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow all DQ constraints to be generated from an Analyzer #508

Merged
merged 1 commit into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/main/scala/com/amazon/deequ/checks/Check.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1083,6 +1083,12 @@ object Check {
/** A common assertion function checking if the value is 1 */
val IsOne: Double => Boolean = { _ == 1.0 }

def fromConstraint(constraint: Constraint,
description: String,
checkLevel: CheckLevel.Value = CheckLevel.Error): Check = {
Check(checkLevel, description, constraints = Seq(constraint))
}

/**
* Common assertion function checking if the value can be considered as normal (that no
* anomalies were detected), given the anomaly detection strategy and details on how to retrieve
Expand Down
121 changes: 114 additions & 7 deletions src/main/scala/com/amazon/deequ/constraints/Constraint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ object Constraint {

val size = Size(where)

fromAnalyzer(size, assertion, hint)
}

def fromAnalyzer(size: Size, assertion: Long => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[NumMatches, Double, Long](size,
assertion, Some(_.toLong), hint)

Expand Down Expand Up @@ -194,10 +198,16 @@ object Constraint {

val completeness = Completeness(column, where)

this.fromAnalyzer(completeness, assertion, hint)
}

def fromAnalyzer(completeness: Completeness,
assertion: Double => Boolean,
hint: Option[String] = None): Constraint = {
val constraint = AnalysisBasedConstraint[NumMatchesAndCount, Double, Double](
completeness, assertion, hint = hint)

new RowLevelConstraint(constraint, s"CompletenessConstraint($completeness)", s"Completeness-$column")
new RowLevelConstraint(constraint, s"CompletenessConstraint($completeness)", s"Completeness-${completeness.column}")
}

/**
Expand Down Expand Up @@ -238,12 +248,16 @@ object Constraint {

val uniqueness = Uniqueness(columns, where)

fromAnalyzer(uniqueness, assertion, hint)
}

def fromAnalyzer(uniqueness: Uniqueness, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[FrequenciesAndNumRows, Double, Double](
uniqueness, assertion, hint = hint)

new RowLevelGroupedConstraint(constraint,
s"UniquenessConstraint($uniqueness)",
columns)
uniqueness.columns)
}

/**
Expand All @@ -264,6 +278,10 @@ object Constraint {

val distinctness = Distinctness(columns, where)

fromAnalyzer(distinctness, assertion, hint)
}

def fromAnalyzer(distinctness: Distinctness, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[FrequenciesAndNumRows, Double, Double](
distinctness, assertion, hint = hint)

Expand All @@ -287,12 +305,20 @@ object Constraint {
: Constraint = {

val uniqueValueRatio = UniqueValueRatio(columns, where)
fromAnalyzer(uniqueValueRatio, assertion, hint)
}

def fromAnalyzer(
uniqueValueRatio: UniqueValueRatio,
assertion: Double => Boolean,
hint: Option[String])
: Constraint = {
val constraint = AnalysisBasedConstraint[FrequenciesAndNumRows, Double, Double](
uniqueValueRatio, assertion, hint = hint)

new RowLevelGroupedConstraint(constraint,
s"UniqueValueRatioConstraint($uniqueValueRatio",
columns)
uniqueValueRatio.columns)
}

/**
Expand All @@ -314,15 +340,18 @@ object Constraint {

val compliance = Compliance(name, column, where, columns)

fromAnalyzer(compliance, assertion, hint)
}

private def fromAnalyzer(compliance: Compliance, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[NumMatchesAndCount, Double, Double](
compliance, assertion, hint = hint)

// new NamedConstraint(constraint, s"ComplianceConstraint($compliance)")
val sparkAssertion = org.apache.spark.sql.functions.udf(assertion)
new RowLevelAssertedConstraint(
constraint,
s"ComplianceConstraint($compliance)",
s"ColumnsCompliance-$column",
s"ColumnsCompliance-${compliance.predicate}",
sparkAssertion)
}

Expand All @@ -346,6 +375,16 @@ object Constraint {

val patternMatch = PatternMatch(column, pattern, where)

fromAnalyzer(patternMatch, pattern, assertion, name, hint)
}

def fromAnalyzer(
patternMatch: PatternMatch,
pattern: Regex,
assertion: Double => Boolean,
name: Option[String],
hint: Option[String]): Constraint = {
val column: String = patternMatch.column
val constraint = AnalysisBasedConstraint[NumMatchesAndCount, Double, Double](
patternMatch, assertion, hint = hint)

Expand Down Expand Up @@ -375,6 +414,10 @@ object Constraint {

val entropy = Entropy(column, where)

fromAnalyzer(entropy, assertion, hint)
}

def fromAnalyzer(entropy: Entropy, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[FrequenciesAndNumRows, Double, Double](
entropy, assertion, hint = hint)

Expand All @@ -401,6 +444,14 @@ object Constraint {

val mutualInformation = MutualInformation(Seq(columnA, columnB), where)

fromAnalyzer(mutualInformation, assertion, hint)
}

def fromAnalyzer(
mutualInformation: MutualInformation,
assertion: Double => Boolean,
hint: Option[String])
: Constraint = {
val constraint = AnalysisBasedConstraint[FrequenciesAndNumRows, Double, Double](
mutualInformation, assertion, hint = hint)

Expand All @@ -427,6 +478,10 @@ object Constraint {

val approxQuantile = ApproxQuantile(column, quantile, where = where)

fromAnalyzer(approxQuantile, assertion, hint)
}

def fromAnalyzer(approxQuantile: ApproxQuantile, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[ApproxQuantileState, Double, Double](
approxQuantile, assertion, hint = hint)

Expand All @@ -450,6 +505,11 @@ object Constraint {

val maxLength = MaxLength(column, where, analyzerOptions)

fromAnalyzer(maxLength, assertion, hint)
}

def fromAnalyzer(maxLength: MaxLength, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val column: String = maxLength.column
val constraint = AnalysisBasedConstraint[MaxState, Double, Double](maxLength, assertion,
hint = hint)

Expand Down Expand Up @@ -479,6 +539,11 @@ object Constraint {

val minLength = MinLength(column, where, analyzerOptions)

fromAnalyzer(minLength, assertion, hint)
}

def fromAnalyzer(minLength: MinLength, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val column: String = minLength.column
val constraint = AnalysisBasedConstraint[MinState, Double, Double](minLength, assertion,
hint = hint)

Expand Down Expand Up @@ -508,6 +573,11 @@ object Constraint {

val minimum = Minimum(column, where)

fromAnalyzer(minimum, assertion, hint)
}

def fromAnalyzer(minimum: Minimum, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val column: String = minimum.column
val constraint = AnalysisBasedConstraint[MinState, Double, Double](minimum, assertion,
hint = hint)

Expand Down Expand Up @@ -535,6 +605,11 @@ object Constraint {

val maximum = Maximum(column, where)

fromAnalyzer(maximum, assertion, hint)
}

def fromAnalyzer(maximum: Maximum, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val column: String = maximum.column
val constraint = AnalysisBasedConstraint[MaxState, Double, Double](maximum, assertion,
hint = hint)

Expand Down Expand Up @@ -562,11 +637,16 @@ object Constraint {

val mean = Mean(column, where)

fromAnalyzer(mean, assertion, hint)
}

def fromAnalyzer(mean: Mean, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[MeanState, Double, Double](mean, assertion,
hint = hint)

new NamedConstraint(constraint, s"MeanConstraint($mean)")
}

/**
* Runs sum analysis on the given column and executes the assertion
*
Expand All @@ -583,13 +663,16 @@ object Constraint {

val sum = Sum(column, where)

fromAnalyzer(sum, assertion, hint)
}

def fromAnalyzer(sum: Sum, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[SumState, Double, Double](sum, assertion,
hint = hint)

new NamedConstraint(constraint, s"SumConstraint($sum)")
}


/**
* Runs standard deviation analysis on the given column and executes the assertion
*
Expand All @@ -606,6 +689,14 @@ object Constraint {

val standardDeviation = StandardDeviation(column, where)

fromAnalyzer(standardDeviation, assertion, hint)
}

def fromAnalyzer(
standardDeviation: StandardDeviation,
assertion: Double => Boolean,
hint: Option[String])
: Constraint = {
val constraint = AnalysisBasedConstraint[StandardDeviationState, Double, Double](
standardDeviation, assertion, hint = hint)

Expand All @@ -628,6 +719,14 @@ object Constraint {

val approxCountDistinct = ApproxCountDistinct(column, where)

fromAnalyzer(approxCountDistinct, assertion, hint)
}

def fromAnalyzer(
approxCountDistinct: ApproxCountDistinct,
assertion: Double => Boolean,
hint: Option[String])
: Constraint = {
val constraint = AnalysisBasedConstraint[ApproxCountDistinctState, Double, Double](
approxCountDistinct, assertion, hint = hint)

Expand All @@ -652,6 +751,10 @@ object Constraint {

val correlation = Correlation(columnA, columnB, where)

fromAnalyzer(correlation, assertion, hint)
}

def fromAnalyzer(correlation: Correlation, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[CorrelationState, Double, Double](
correlation, assertion, hint = hint)

Expand Down Expand Up @@ -711,7 +814,11 @@ object Constraint {

val kllSketch = KLLSketch(column, kllParameters = kllParameters)

val constraint = AnalysisBasedConstraint[KLLState, BucketDistribution, BucketDistribution] (
fromAnalyzer(kllSketch, assertion, hint)
}

def fromAnalyzer(kllSketch: KLLSketch, assertion: BucketDistribution => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[KLLState, BucketDistribution, BucketDistribution](
kllSketch, assertion, hint = hint)

new NamedConstraint(constraint, s"kllSketchConstraint($kllSketch)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@
package com.amazon.deequ.constraints

import com.amazon.deequ.SparkContextSpec
import com.amazon.deequ.VerificationSuite
import com.amazon.deequ.analyzers._
import com.amazon.deequ.analyzers.runners.MetricCalculationException
import com.amazon.deequ.checks.Check
import com.amazon.deequ.checks.CheckLevel
import com.amazon.deequ.checks.CheckResult
import com.amazon.deequ.checks.CheckStatus
import com.amazon.deequ.constraints.ConstraintUtils.calculate
import com.amazon.deequ.metrics.{DoubleMetric, Entity, Metric}
import com.amazon.deequ.utils.FixtureSupport
Expand Down Expand Up @@ -74,6 +79,24 @@ class AnalysisBasedConstraintTest extends WordSpec with Matchers with SparkConte

"Analysis based constraint" should {

"can convert an analyzer to a Check" in
withSparkSession { sparkSession =>
val completeness = Completeness("att1")
val constraint1 = Constraint.fromAnalyzer(completeness, d => d > 1)
val size: Size = Size()
val sizeAssertion: (Long => Boolean) = d => d > 0
val constraint2 = Constraint.fromAnalyzer(size, sizeAssertion, None)

val check1 = Check.fromConstraint(constraint1, s"Completeness att1")
val check2 = Check.fromConstraint(constraint2, "Size")

val df = getDfMissing(sparkSession)

val result = new VerificationSuite().onData(df).addCheck(check1).run()

assert(result.status == CheckStatus.Error)
}

"assert correctly on values if analysis is successful" in
withSparkSession { sparkSession =>
val df = getDfMissing(sparkSession)
Expand Down