From 1c8f340bcdceffea2250adda40b3aa625813cd20 Mon Sep 17 00:00:00 2001 From: ZDQ870 Date: Tue, 17 Oct 2023 15:09:25 -0400 Subject: [PATCH 1/2] Creation of Exact Quantile Check --- .../deequ/analyzers/ExactQuantile.scala | 49 +++++++++++++++++++ .../deequ/analyzers/StateProvider.scala | 5 ++ .../scala/com/amazon/deequ/checks/Check.scala | 20 ++++++++ .../amazon/deequ/constraints/Constraint.scala | 30 ++++++++++++ .../repository/AnalysisResultSerde.scala | 11 +++++ .../com/amazon/deequ/checks/CheckTest.scala | 16 +++++- .../repository/AnalysisResultSerdeTest.scala | 14 +++++- 7 files changed, 143 insertions(+), 2 deletions(-) create mode 100644 src/main/scala/com/amazon/deequ/analyzers/ExactQuantile.scala diff --git a/src/main/scala/com/amazon/deequ/analyzers/ExactQuantile.scala b/src/main/scala/com/amazon/deequ/analyzers/ExactQuantile.scala new file mode 100644 index 000000000..656b3cb5a --- /dev/null +++ b/src/main/scala/com/amazon/deequ/analyzers/ExactQuantile.scala @@ -0,0 +1,49 @@ +package com.amazon.deequ.analyzers + +import com.amazon.deequ.analyzers.Preconditions.{hasColumn, isNumeric} +import com.amazon.deequ.analyzers.Analyzers.{conditionalSelection, ifNoNullsIn} +import com.amazon.deequ.metrics.FullColumn +import org.apache.curator.shaded.com.google.common.annotations.VisibleForTesting +import org.apache.spark.sql.{Column, Row} +import org.apache.spark.sql.functions.expr +import org.apache.spark.sql.types.{DoubleType, StructType} + +case class ExactQuantileState(exactQuantile: Double, quantile: Double, override val fullColumn: Option[Column] = None) +extends DoubleValuedState[ExactQuantileState] with FullColumn { + override def sum(other: ExactQuantileState): ExactQuantileState = { + + ExactQuantileState( + expr(s"percentile($fullColumn, $quantile)").toString().toDouble, + quantile, + sum(fullColumn, other.fullColumn)) + } + + override def metricValue(): Double = { + exactQuantile + } +} + +case class ExactQuantile(column: String, + quantile: Double, + where: Option[String] = None) +extends StandardScanShareableAnalyzer[ExactQuantileState]("ExactQuantile", column) +with FilterableAnalyzer { + override def aggregationFunctions(): Seq[Column] = { + expr(s"percentile(${conditionalSelection(column, where).cast(DoubleType)}, $quantile)") :: Nil + } + + override def fromAggregationResult(result: Row, offset: Int): Option[ExactQuantileState] = { + ifNoNullsIn(result, offset) { _ => + ExactQuantileState(result.getDouble(offset), quantile, Some(criterion)) + } + } + + override protected def additionalPreconditions(): Seq[StructType => Unit] = { + hasColumn(column) :: isNumeric(column) :: Nil + } + + override def filterCondition: Option[String] = where + + @VisibleForTesting + private def criterion: Column = conditionalSelection(column, where).cast(DoubleType) +} diff --git a/src/main/scala/com/amazon/deequ/analyzers/StateProvider.scala b/src/main/scala/com/amazon/deequ/analyzers/StateProvider.scala index 65edb9424..b752d67c1 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/StateProvider.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/StateProvider.scala @@ -135,6 +135,9 @@ case class HdfsStateProvider( val serializedDigest = ApproximatePercentile.serializer.serialize(percentileDigest) persistBytes(serializedDigest, identifier) + case _: ExactQuantile => + persistDoubleState(state.asInstanceOf[ExactQuantileState].exactQuantile, identifier) + case _ => throw new IllegalArgumentException(s"Unable to persist state for analyzer $analyzer.") } @@ -177,6 +180,8 @@ case class HdfsStateProvider( val percentileDigest = ApproximatePercentile.serializer.deserialize(loadBytes(identifier)) ApproxQuantileState(percentileDigest) + case _: ExactQuantile => ExactQuantile(identifier, loadDoubleState(identifier)) + case _ => throw new IllegalArgumentException(s"Unable to load state for analyzer $analyzer.") } diff --git a/src/main/scala/com/amazon/deequ/checks/Check.scala b/src/main/scala/com/amazon/deequ/checks/Check.scala index 66879334f..afd3a1a3b 100644 --- a/src/main/scala/com/amazon/deequ/checks/Check.scala +++ b/src/main/scala/com/amazon/deequ/checks/Check.scala @@ -506,6 +506,26 @@ case class Check( approxQuantileConstraint(column, quantile, assertion, filter, hint)) } + /** + * Creates a constraint that asserts on an exact quantile + * + * @param column Column to run the assertion on + * @param quantile Which quantile to assert on + * @param assertion Function that receives a double input parameter (the computed quantile) + * and returns a boolean + * @param hint A hint to provide additional context why a constraint could have failed + * @return + */ + def hasExactQuantile(column: String, + quantile: Double, + assertion: Double => Boolean, + hint: Option[String] = None) + : CheckWithLastConstraintFilterable = { + + addFilterableConstraint(filter => + exactQuantileConstraint(column, quantile, assertion, filter, hint)) + } + /** * Creates a constraint that asserts on the minimum length of the column * diff --git a/src/main/scala/com/amazon/deequ/constraints/Constraint.scala b/src/main/scala/com/amazon/deequ/constraints/Constraint.scala index 8bd83990e..39d8f522f 100644 --- a/src/main/scala/com/amazon/deequ/constraints/Constraint.scala +++ b/src/main/scala/com/amazon/deequ/constraints/Constraint.scala @@ -488,6 +488,36 @@ object Constraint { new NamedConstraint(constraint, s"ApproxQuantileConstraint($approxQuantile)") } + /** + * Runs exact quantile analysis on the given column and executes the assertion + * + * @param column Column to run the assertion on + * @param quantile Which quantile to assert on + * @param assertion Function that receives a double input parameter (the computed quantile) + * and returns a boolean + * @param where Additional filter to apply before the analyzer is run. + * @param hint A hint to provide additional context why a constraint could have failed + */ + def exactQuantileConstraint( + column: String, + quantile: Double, + assertion: Double => Boolean, + where: Option[String] = None, + hint: Option[String] = None) + : Constraint = { + + val exactQuantile = ExactQuantile(column, quantile, where = where) + + fromAnalyzer(exactQuantile, assertion, hint) + } + + def fromAnalyzer(exactQuantile: ExactQuantile, assertion: Double => Boolean, hint: Option[String]): Constraint = { + val constraint = AnalysisBasedConstraint[ExactQuantileState, Double, Double]( + exactQuantile, assertion, hint = hint) + + new NamedConstraint(constraint, s"ExactQuantileConstraint($exactQuantile)") + } + /** * Runs max length analysis on the given column and executes the assertion * diff --git a/src/main/scala/com/amazon/deequ/repository/AnalysisResultSerde.scala b/src/main/scala/com/amazon/deequ/repository/AnalysisResultSerde.scala index 4a2ca1058..e9bb4f7df 100644 --- a/src/main/scala/com/amazon/deequ/repository/AnalysisResultSerde.scala +++ b/src/main/scala/com/amazon/deequ/repository/AnalysisResultSerde.scala @@ -346,6 +346,12 @@ private[deequ] object AnalyzerSerializer result.addProperty("quantiles", approxQuantiles.quantiles.mkString(",")) result.addProperty("relativeError", approxQuantiles.relativeError) + case exactQuantile: ExactQuantile => + result.addProperty(ANALYZER_NAME_FIELD, "ExactQuantile") + result.addProperty(COLUMN_FIELD, exactQuantile.column) + result.addProperty("quantile", exactQuantile.quantile) + result.addProperty(WHERE_FIELD, exactQuantile.where.orNull) + case minLength: MinLength => result.addProperty(ANALYZER_NAME_FIELD, "MinLength") @@ -481,6 +487,11 @@ private[deequ] object AnalyzerDeserializer val relativeError = json.get("relativeError").getAsDouble ApproxQuantiles(column, quantile, relativeError) + case "ExactQuantile" => + val column = json.get(COLUMN_FIELD).getAsString + val quantile = json.get("quantile").getAsDouble + ExactQuantile(column, quantile) + case "MinLength" => MinLength( json.get(COLUMN_FIELD).getAsString, diff --git a/src/test/scala/com/amazon/deequ/checks/CheckTest.scala b/src/test/scala/com/amazon/deequ/checks/CheckTest.scala index 70e998ee5..0a46b9e25 100644 --- a/src/test/scala/com/amazon/deequ/checks/CheckTest.scala +++ b/src/test/scala/com/amazon/deequ/checks/CheckTest.scala @@ -583,7 +583,7 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix val numericAnalysis = AnalysisRunner.onData(dfNumeric).addAnalyzers(Seq( Minimum("att1"), Maximum("att1"), Mean("att1"), Sum("att1"), StandardDeviation("att1"), ApproxCountDistinct("att1"), - ApproxQuantile("att1", quantile = 0.5))) + ApproxQuantile("att1", quantile = 0.5), ExactQuantile("att1", quantile = 0.5))) val contextNumeric = numericAnalysis.run() @@ -594,6 +594,7 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix assertSuccess(baseCheck.hasStandardDeviation("att1", _ == 1.707825127659933), contextNumeric) assertSuccess(baseCheck.hasApproxCountDistinct("att1", _ == 6.0), contextNumeric) assertSuccess(baseCheck.hasApproxQuantile("att1", quantile = 0.5, _ == 3.0), contextNumeric) + assertSuccess(baseCheck.hasExactQuantile("att1", quantile = 0.5, _ == 3.5), contextNumeric) val correlationAnalysisInformative = AnalysisRunner.onData(dfInformative) .addAnalyzer(Correlation("att1", "att2")) @@ -634,6 +635,19 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix assertSuccess(hasApproxQuantileCheckWithFilter, context) } + "correctly evaluate hasExactQuantile constraints" in withSparkSession { sparkSession => + val hasExactQuantileCheck = Check(CheckLevel.Error, "a") + .hasExactQuantile("att1", quantile = 0.5, _ == 3.5) + val hasExactQuantileCheckWithFilter = Check(CheckLevel.Error, "a") + .hasExactQuantile("att1", quantile = 0.5, _ == 5.0).where("att2 > 0") + + val context = runChecks(getDfWithNumericValues(sparkSession), hasExactQuantileCheck, + hasExactQuantileCheckWithFilter) + + assertSuccess(hasExactQuantileCheck, context) + assertSuccess(hasExactQuantileCheckWithFilter, context) + } + "yield correct results for minimum and maximum length stats" in withSparkSession { sparkSession => val baseCheck = Check(CheckLevel.Error, description = "a description") diff --git a/src/test/scala/com/amazon/deequ/repository/AnalysisResultSerdeTest.scala b/src/test/scala/com/amazon/deequ/repository/AnalysisResultSerdeTest.scala index 7083a5c1d..6f1fa1874 100644 --- a/src/test/scala/com/amazon/deequ/repository/AnalysisResultSerdeTest.scala +++ b/src/test/scala/com/amazon/deequ/repository/AnalysisResultSerdeTest.scala @@ -83,7 +83,9 @@ class AnalysisResultSerdeTest extends FlatSpec with Matchers { MinLength("ColumnA") -> DoubleMetric(Entity.Column, "MinLength", "ColumnA", Success(5.0)), MaxLength("ColumnA") -> - DoubleMetric(Entity.Column, "MaxLength", "ColumnA", Success(5.0)) + DoubleMetric(Entity.Column, "MaxLength", "ColumnA", Success(5.0)), + ExactQuantile("ColumnA", 0.5) -> + DoubleMetric(Entity.Column, "Completeness", "ColumnA", Success(5.0)) )) val dateTime = LocalDate.of(2017, 10, 14).atTime(10, 10, 10) @@ -173,6 +175,16 @@ class AnalysisResultSerdeTest extends FlatSpec with Matchers { assertCorrectlyConvertsAnalysisResults(Seq(result)) } + "serialization of ExactQuantile" should "correctly restore it" in { + + val analyzer = ExactQuantile("col", 0.5) + val metric = DoubleMetric(Entity.Column, "ExactQuantile", "col", Success(0.5)) + val context = AnalyzerContext(Map(analyzer -> metric)) + val result = new AnalysisResult(ResultKey(0), context) + + assertCorrectlyConvertsAnalysisResults(Seq(result)) + } + val histogramSumJson = """[ | { From 677de6e3b16d50fd2e861d7453dc995ffc134807 Mon Sep 17 00:00:00 2001 From: ZDQ870 Date: Thu, 26 Oct 2023 11:55:42 -0400 Subject: [PATCH 2/2] Fix build issue --- .../amazon/deequ/analyzers/ExactQuantile.scala | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/main/scala/com/amazon/deequ/analyzers/ExactQuantile.scala b/src/main/scala/com/amazon/deequ/analyzers/ExactQuantile.scala index 656b3cb5a..b3ef9878e 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/ExactQuantile.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/ExactQuantile.scala @@ -1,3 +1,19 @@ +/** + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License + * is located at + * + * http://aws.amazon.com/apache2.0/ + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + package com.amazon.deequ.analyzers import com.amazon.deequ.analyzers.Preconditions.{hasColumn, isNumeric}