diff --git a/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala b/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala index b3e44c4c..a8040582 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala @@ -1,5 +1,5 @@ /** - * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License @@ -19,17 +19,17 @@ package com.amazon.deequ.analyzers import com.amazon.deequ.analyzers.Analyzers._ import com.amazon.deequ.analyzers.NullBehavior.NullBehavior import com.amazon.deequ.analyzers.runners._ -import com.amazon.deequ.metrics.FullColumn -import com.amazon.deequ.utilities.ColumnUtil.removeEscapeColumn import com.amazon.deequ.metrics.DoubleMetric import com.amazon.deequ.metrics.Entity +import com.amazon.deequ.metrics.FullColumn import com.amazon.deequ.metrics.Metric -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types._ +import com.amazon.deequ.utilities.ColumnUtil.removeEscapeColumn import org.apache.spark.sql.Column import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ import scala.language.existentials import scala.util.Failure diff --git a/src/main/scala/com/amazon/deequ/analyzers/DataSynchronizationAnalyzer.scala b/src/main/scala/com/amazon/deequ/analyzers/DataSynchronizationAnalyzer.scala new file mode 100644 index 00000000..1d7e3753 --- /dev/null +++ b/src/main/scala/com/amazon/deequ/analyzers/DataSynchronizationAnalyzer.scala @@ -0,0 +1,94 @@ +/** + * Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License + * is located at + * + * http://aws.amazon.com/apache2.0/ + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.deequ.analyzers + +import com.amazon.deequ.analyzers.Analyzers.metricFromFailure +import com.amazon.deequ.comparison.DataSynchronization +import com.amazon.deequ.comparison.DataSynchronizationFailed +import com.amazon.deequ.comparison.DataSynchronizationSucceeded +import com.amazon.deequ.metrics.DoubleMetric +import com.amazon.deequ.metrics.Entity +import org.apache.spark.sql.DataFrame + +import scala.util.Failure +import scala.util.Try + + +/** + * An Analyzer for Deequ that performs a data synchronization check between two DataFrames. + * It evaluates the degree of synchronization based on specified column mappings and an assertion function. + * + * The analyzer computes a ratio of synchronized data points to the total data points, represented as a DoubleMetric. + * Refer to [[com.amazon.deequ.comparison.DataSynchronization.columnMatch]] for DataSynchronization implementation + * + * @param dfToCompare The DataFrame to compare with the primary DataFrame that is setup + * during [[com.amazon.deequ.VerificationSuite.onData]] setup. + * @param columnMappings A map where each key-value pair represents a column in the primary DataFrame + * and its corresponding column in dfToCompare. + * @param assertion A function that takes a Double (the match ratio) and returns a Boolean. + * It defines the condition for successful synchronization. + * + * Usage: + * This analyzer is used in Deequ's VerificationSuite based if `isDataSynchronized` check is defined or could be used + * manually as well. + * + * Example: + * val analyzer = DataSynchronizationAnalyzer(dfToCompare, Map("col1" -> "col2"), _ > 0.8) + * val verificationResult = VerificationSuite().onData(df).addAnalyzer(analyzer).run() + * + * // or could do something like below + * val verificationResult = VerificationSuite().onData(df).isDataSynchronized(dfToCompare, Map("col1" -> "col2"), + * _ > 0.8).run() + * + * + * The computeStateFrom method calculates the synchronization state by comparing the specified columns of the two + * DataFrames. + * The computeMetricFrom method then converts this state into a DoubleMetric representing the synchronization ratio. + * + */ +case class DataSynchronizationAnalyzer(dfToCompare: DataFrame, + columnMappings: Map[String, String], + assertion: Double => Boolean) + extends Analyzer[DataSynchronizationState, DoubleMetric] { + + override def computeStateFrom(data: DataFrame): Option[DataSynchronizationState] = { + + val result = DataSynchronization.columnMatch(data, dfToCompare, columnMappings, assertion) + + result match { + case succeeded: DataSynchronizationSucceeded => + Some(DataSynchronizationState(succeeded.passedCount, succeeded.totalCount)) + case failed: DataSynchronizationFailed => + Some(DataSynchronizationState(failed.passedCount.getOrElse(0), failed.totalCount.getOrElse(0))) + case _ => None + } + } + + override def computeMetricFrom(state: Option[DataSynchronizationState]): DoubleMetric = { + + val metric = state match { + case Some(s) => Try(s.synchronizedDataCount.toDouble / s.totalDataCount.toDouble) + case _ => Failure(new IllegalStateException("No state available for DataSynchronizationAnalyzer")) + } + + DoubleMetric(Entity.Dataset, "DataSynchronization", "", metric, None) + } + + override private[deequ] def toFailureMetric(failure: Exception) = + metricFromFailure(failure, "DataSynchronization", "", Entity.Dataset) +} + diff --git a/src/main/scala/com/amazon/deequ/analyzers/DataSynchronizationState.scala b/src/main/scala/com/amazon/deequ/analyzers/DataSynchronizationState.scala new file mode 100644 index 00000000..e0321df3 --- /dev/null +++ b/src/main/scala/com/amazon/deequ/analyzers/DataSynchronizationState.scala @@ -0,0 +1,48 @@ +/** + * Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License + * is located at + * + * http://aws.amazon.com/apache2.0/ + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.deequ.analyzers + +/** + * Represents the state of data synchronization between two DataFrames in Deequ. + * This state keeps track of the count of synchronized record count and the total record count. + * It is used to calculate a ratio of synchronization, which is a measure of how well the data + * in the two DataFrames are synchronized. + * + * @param synchronizedDataCount The count of records that are considered synchronized between the two DataFrames. + * @param totalDataCount The total count of records for check. + * + * The `sum` method allows for aggregation of this state with another, combining the counts from both states. + * This is useful in distributed computations where states from different partitions need to be aggregated. + * + * The `metricValue` method computes the synchronization ratio. It is the ratio of `synchronizedDataCount` + * to `dataCount`. + * If `dataCount` is zero, which means no data points were examined, the method returns `Double.NaN` + * to indicate the undefined state. + * + */ +case class DataSynchronizationState(synchronizedDataCount: Long, totalDataCount: Long) + extends DoubleValuedState[DataSynchronizationState] { + override def sum(other: DataSynchronizationState): DataSynchronizationState = { + DataSynchronizationState(synchronizedDataCount + other.synchronizedDataCount, totalDataCount + other.totalDataCount) + } + + override def metricValue(): Double = { + if (totalDataCount == 0L) Double.NaN else synchronizedDataCount.toDouble / totalDataCount.toDouble + } +} + +object DataSynchronizationState diff --git a/src/main/scala/com/amazon/deequ/checks/Check.scala b/src/main/scala/com/amazon/deequ/checks/Check.scala index afd3a1a3..9f6f6ea0 100644 --- a/src/main/scala/com/amazon/deequ/checks/Check.scala +++ b/src/main/scala/com/amazon/deequ/checks/Check.scala @@ -1,5 +1,5 @@ /** - * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License @@ -16,17 +16,29 @@ package com.amazon.deequ.checks -import com.amazon.deequ.analyzers.AnalyzerOptions -import com.amazon.deequ.analyzers.{Analyzer, Histogram, KLLParameters, Patterns, State} -import com.amazon.deequ.anomalydetection.{AnomalyDetectionStrategy, AnomalyDetector, DataPoint} import com.amazon.deequ.analyzers.runners.AnalyzerContext +import com.amazon.deequ.analyzers.Analyzer +import com.amazon.deequ.analyzers.AnalyzerOptions +import com.amazon.deequ.analyzers.DataSynchronizationAnalyzer +import com.amazon.deequ.analyzers.DataSynchronizationState +import com.amazon.deequ.analyzers.Histogram +import com.amazon.deequ.analyzers.KLLParameters +import com.amazon.deequ.analyzers.Patterns +import com.amazon.deequ.analyzers.State +import com.amazon.deequ.anomalydetection.HistoryUtils +import com.amazon.deequ.anomalydetection.AnomalyDetectionStrategy +import com.amazon.deequ.anomalydetection.AnomalyDetector +import com.amazon.deequ.anomalydetection.DataPoint +import com.amazon.deequ.checks.ColumnCondition.isAnyNotNull +import com.amazon.deequ.checks.ColumnCondition.isEachNotNull import com.amazon.deequ.constraints.Constraint._ import com.amazon.deequ.constraints._ -import com.amazon.deequ.metrics.{BucketDistribution, Distribution, Metric} +import com.amazon.deequ.metrics.BucketDistribution +import com.amazon.deequ.metrics.Distribution +import com.amazon.deequ.metrics.Metric import com.amazon.deequ.repository.MetricsRepository +import org.apache.spark.sql.DataFrame import org.apache.spark.sql.expressions.UserDefinedFunction -import com.amazon.deequ.anomalydetection.HistoryUtils -import com.amazon.deequ.checks.ColumnCondition.{isAnyNotNull, isEachNotNull} import scala.util.matching.Regex @@ -338,6 +350,59 @@ case class Check( uniqueValueRatioConstraint(columns, assertion, filter, hint) } } + /** + * Performs a data synchronization check between the base DataFrame supplied to + * [[com.amazon.deequ.VerificationSuite.onData]] and other DataFrame supplied to this check using Deequ's + * [[com.amazon.deequ.comparison.DataSynchronization.columnMatch]] framework. + * This method compares specified columns of both DataFrames and assesses synchronization based on a custom assertion. + * + * Utilizes [[com.amazon.deequ.analyzers.DataSynchronizationAnalyzer]] for comparing the data + * and Constraint [[com.amazon.deequ.constraints.DataSynchronizationConstraint]]. + * + * Usage: + * To use this method, create a VerificationSuite and invoke this method as part of adding checks: + * {{{ + * val baseDataFrame: DataFrame = ... + * val otherDataFrame: DataFrame = ... + * val columnMappings: Map[String, String] = Map("baseCol1" -> "otherCol1", "baseCol2" -> "otherCol2") + * val assertionFunction: Double => Boolean = _ > 0.7 + * + * val check = new Check(CheckLevel.Error, "Data Synchronization Check") + * .isDataSynchronized(otherDataFrame, columnMappings, assertionFunction) + * + * val verificationResult = VerificationSuite() + * .onData(baseDataFrame) + * .addCheck(check) + * .run() + * }}} + * + * This will add a data synchronization check to the VerificationSuite, comparing the specified columns of + * baseDataFrame and otherDataFrame based on the provided assertion function. + * + * + * @param otherDf The DataFrame to be compared with the current one. Analyzed in conjunction with the + * current DataFrame to assess data synchronization. + * @param columnMappings A map defining the column correlations between the current DataFrame and otherDf. + * Keys represent column names in the current DataFrame, + * and values are corresponding column names in otherDf. + * @param assertion A function that takes a Double (result of the comparison) and returns a Boolean. + * Defines the condition under which the data in both DataFrames is considered synchronized. + * For example (_ > 0.7) denoting metric value > 0.7 or 70% of records. + * @param hint Optional. Additional context or information about the synchronization check. + * Helpful for understanding the intent or specifics of the check. Default is None. + * @return A [[com.amazon.deequ.checks.Check]] object representing the outcome + * of the synchronization check. This object can be used in Deequ's verification suite to + * assert data quality constraints. + * + */ + def isDataSynchronized(otherDf: DataFrame, columnMappings: Map[String, String], assertion: Double => Boolean, + hint: Option[String] = None): Check = { + val dataSyncAnalyzer = DataSynchronizationAnalyzer(otherDf, columnMappings, assertion) + val constraint = AnalysisBasedConstraint[DataSynchronizationState, Double, Double](dataSyncAnalyzer, assertion, + hint = hint) + addConstraint(constraint) + } + /** * Creates a constraint that asserts on the number of distinct values a column has. * @@ -1092,7 +1157,9 @@ case class Check( case nc: ConstraintDecorator => nc.inner case c: Constraint => c } - .collect { case constraint: AnalysisBasedConstraint[_, _, _] => constraint.analyzer } + .collect { + case constraint: AnalysisBasedConstraint[_, _, _] => constraint.analyzer + } .map { _.asInstanceOf[Analyzer[_, Metric[_]]] } .toSet } diff --git a/src/main/scala/com/amazon/deequ/comparison/ComparisonResult.scala b/src/main/scala/com/amazon/deequ/comparison/ComparisonResult.scala index ec2c6965..67b4d4b4 100644 --- a/src/main/scala/com/amazon/deequ/comparison/ComparisonResult.scala +++ b/src/main/scala/com/amazon/deequ/comparison/ComparisonResult.scala @@ -1,5 +1,5 @@ /** - * Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License @@ -17,5 +17,10 @@ package com.amazon.deequ.comparison sealed trait ComparisonResult -case class ComparisonFailed(errorMessage: String) extends ComparisonResult -case class ComparisonSucceeded() extends ComparisonResult + +case class ComparisonFailed(errorMessage: String, ratio: Double = 0) extends ComparisonResult +case class ComparisonSucceeded(ratio: Double = 0) extends ComparisonResult + +case class DataSynchronizationFailed(errorMessage: String, passedCount: Option[Long] = None, + totalCount: Option[Long] = None) extends ComparisonResult +case class DataSynchronizationSucceeded(passedCount: Long, totalCount: Long) extends ComparisonResult diff --git a/src/main/scala/com/amazon/deequ/comparison/DataSynchronization.scala b/src/main/scala/com/amazon/deequ/comparison/DataSynchronization.scala index c5a82f76..992dc48d 100644 --- a/src/main/scala/com/amazon/deequ/comparison/DataSynchronization.scala +++ b/src/main/scala/com/amazon/deequ/comparison/DataSynchronization.scala @@ -1,5 +1,5 @@ /** - * Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License @@ -16,7 +16,8 @@ package com.amazon.deequ.comparison -import org.apache.spark.sql.{Column, DataFrame} +import org.apache.spark.sql.Column +import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions.col import org.apache.spark.sql.functions.hash import org.apache.spark.sql.functions.lit @@ -101,13 +102,13 @@ object DataSynchronization extends ComparisonBase { val nonKeyColsMatch = colsDS1.forall(columnExists(ds2, _)) if (!nonKeyColsMatch) { - ComparisonFailed("Non key columns in the given data frames do not match.") + DataSynchronizationFailed("Non key columns in the given data frames do not match.") } else { val mergedMaps = colKeyMap ++ colsDS1.map(x => x -> x).toMap finalAssertion(ds1, ds2, mergedMaps, assertion) } } else { - ComparisonFailed(columnErrors.get) + DataSynchronizationFailed(columnErrors.get) } } @@ -137,17 +138,17 @@ object DataSynchronization extends ComparisonBase { val nonKeyColumns2NotInDataset = compCols.values.filterNot(columnExists(ds2, _)) if (nonKeyColumns1NotInDataset.nonEmpty) { - ComparisonFailed(s"The following columns were not found in the first dataset: " + + DataSynchronizationFailed(s"The following columns were not found in the first dataset: " + s"${nonKeyColumns1NotInDataset.mkString(", ")}") } else if (nonKeyColumns2NotInDataset.nonEmpty) { - ComparisonFailed(s"The following columns were not found in the second dataset: " + + DataSynchronizationFailed(s"The following columns were not found in the second dataset: " + s"${nonKeyColumns2NotInDataset.mkString(", ")}") } else { val mergedMaps = colKeyMap ++ compCols finalAssertion(ds1, ds2, mergedMaps, assertion) } } else { - ComparisonFailed(keyColumnErrors.get) + DataSynchronizationFailed(keyColumnErrors.get) } } @@ -155,23 +156,24 @@ object DataSynchronization extends ComparisonBase { ds2: DataFrame, colKeyMap: Map[String, String], optionalCompCols: Option[Map[String, String]] = None, - optionalOutcomeColumnName: Option[String] = None): Either[ComparisonFailed, DataFrame] = { + optionalOutcomeColumnName: Option[String] = None): + Either[DataSynchronizationFailed, DataFrame] = { val columnErrors = areKeyColumnsValid(ds1, ds2, colKeyMap) if (columnErrors.isEmpty) { - val compColsEither: Either[ComparisonFailed, Map[String, String]] = if (optionalCompCols.isDefined) { + val compColsEither: Either[DataSynchronizationFailed, Map[String, String]] = if (optionalCompCols.isDefined) { optionalCompCols.get match { - case compCols if compCols.isEmpty => Left(ComparisonFailed("Empty column comparison map provided.")) + case compCols if compCols.isEmpty => Left(DataSynchronizationFailed("Empty column comparison map provided.")) case compCols => val ds1CompColsNotInDataset = compCols.keys.filterNot(columnExists(ds1, _)) val ds2CompColsNotInDataset = compCols.values.filterNot(columnExists(ds2, _)) if (ds1CompColsNotInDataset.nonEmpty) { Left( - ComparisonFailed(s"The following columns were not found in the first dataset: " + + DataSynchronizationFailed(s"The following columns were not found in the first dataset: " + s"${ds1CompColsNotInDataset.mkString(", ")}") ) } else if (ds2CompColsNotInDataset.nonEmpty) { Left( - ComparisonFailed(s"The following columns were not found in the second dataset: " + + DataSynchronizationFailed(s"The following columns were not found in the second dataset: " + s"${ds2CompColsNotInDataset.mkString(", ")}") ) } else { @@ -184,7 +186,7 @@ object DataSynchronization extends ComparisonBase { val nonKeyColsMatch = ds1NonKeyCols.forall(columnExists(ds2, _)) if (!nonKeyColsMatch) { - Left(ComparisonFailed("Non key columns in the given data frames do not match.")) + Left(DataSynchronizationFailed("Non key columns in the given data frames do not match.")) } else { Right(ds1NonKeyCols.map { c => c -> c}.toMap) } @@ -196,11 +198,11 @@ object DataSynchronization extends ComparisonBase { case Success(df) => Right(df) case Failure(ex) => ex.printStackTrace() - Left(ComparisonFailed(s"Comparison failed due to ${ex.getCause.getClass}")) + Left(DataSynchronizationFailed(s"Comparison failed due to ${ex.getCause.getClass}")) } } } else { - Left(ComparisonFailed(columnErrors.get)) + Left(DataSynchronizationFailed(columnErrors.get)) } } @@ -253,19 +255,22 @@ object DataSynchronization extends ComparisonBase { val ds2Count = ds2.count() if (ds1Count != ds2Count) { - ComparisonFailed(s"The row counts of the two data frames do not match.") + DataSynchronizationFailed(s"The row counts of the two data frames do not match.") } else { val joinExpression: Column = mergedMaps .map { case (col1, col2) => ds1(col1) === ds2(col2)} .reduce((e1, e2) => e1 && e2) val joined = ds1.join(ds2, joinExpression, "inner") - val ratio = joined.count().toDouble / ds1Count + val passedCount = joined.count() + val totalCount = ds1Count + val ratio = passedCount.toDouble / totalCount.toDouble if (assertion(ratio)) { - ComparisonSucceeded() + DataSynchronizationSucceeded(passedCount, totalCount) } else { - ComparisonFailed(s"Value: $ratio does not meet the constraint requirement.") + DataSynchronizationFailed(s"Data Synchronization Comparison Metric Value: $ratio does not meet the constraint" + + s"requirement.", Some(passedCount), Some(totalCount)) } } } diff --git a/src/main/scala/com/amazon/deequ/constraints/Constraint.scala b/src/main/scala/com/amazon/deequ/constraints/Constraint.scala index 39d8f522..5bb8d477 100644 --- a/src/main/scala/com/amazon/deequ/constraints/Constraint.scala +++ b/src/main/scala/com/amazon/deequ/constraints/Constraint.scala @@ -1,5 +1,5 @@ /** - * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License @@ -17,9 +17,13 @@ package com.amazon.deequ.constraints import com.amazon.deequ.analyzers._ -import com.amazon.deequ.metrics.{BucketDistribution, Distribution, Metric} +import com.amazon.deequ.metrics.BucketDistribution +import com.amazon.deequ.metrics.Distribution +import com.amazon.deequ.metrics.Metric import org.apache.spark.sql.expressions.UserDefinedFunction +import scala.util.Failure +import scala.util.Success import scala.util.matching.Regex object ConstraintStatus extends Enumeration { @@ -897,3 +901,30 @@ object Constraint { } } + +/** + * Data Synchronization Constraint + * @param analyzer Data Synchronization Analyzer + * @param hint hint + */ +case class DataSynchronizationConstraint(analyzer: DataSynchronizationAnalyzer, hint: Option[String]) + extends Constraint { + + override def evaluate(metrics: Map[Analyzer[_, Metric[_]], Metric[_]]): ConstraintResult = { + + metrics.collectFirst { + case (_: DataSynchronizationAnalyzer, metric: Metric[Double]) => metric + } match { + case Some(metric) => + val result = metric.value match { + case Success(value) => analyzer.assertion(value) + case Failure(_) => false + } + val status = if (result) ConstraintStatus.Success else ConstraintStatus.Failure + ConstraintResult(this, status, hint, Some(metric)) + + case None => + ConstraintResult(this, ConstraintStatus.Failure, hint, None) + } + } +} diff --git a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala index a21bfc93..e260d2f1 100644 --- a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala +++ b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala @@ -22,9 +22,10 @@ import com.amazon.deequ.anomalydetection.AbsoluteChangeStrategy import com.amazon.deequ.checks.Check import com.amazon.deequ.checks.CheckLevel import com.amazon.deequ.checks.CheckStatus -import com.amazon.deequ.constraints.{Constraint, ConstraintResult} +import com.amazon.deequ.constraints.Constraint import com.amazon.deequ.io.DfsUtils -import com.amazon.deequ.metrics.{DoubleMetric, Entity, Metric} +import com.amazon.deequ.metrics.DoubleMetric +import com.amazon.deequ.metrics.Entity import com.amazon.deequ.repository.MetricsRepository import com.amazon.deequ.repository.ResultKey import com.amazon.deequ.repository.memory.InMemoryMetricsRepository @@ -32,6 +33,9 @@ import com.amazon.deequ.utils.CollectionUtils.SeqExtensions import com.amazon.deequ.utils.FixtureSupport import com.amazon.deequ.utils.TempFileUtils import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.Row +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.functions.when import org.scalamock.scalatest.MockFactory import org.scalatest.Matchers import org.scalatest.WordSpec @@ -806,6 +810,9 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec val complianceCheckThatShouldFailCompleteness = Check(CheckLevel.Error, "shouldErrorStringType") .hasCompleteness("fake", x => x > 0) + val checkHasDataInSyncTest = Check(CheckLevel.Error, "shouldSucceedForAge") + .isDataSynchronized(df, Map("age" -> "age"), _ > 0.99, Some("shouldPass")) + val verificationResult = VerificationSuite() .onData(df) .addCheck(checkThatShouldSucceed) @@ -815,6 +822,7 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec .addCheck(checkThatShouldFail) .addCheck(complianceCheckThatShouldFail) .addCheck(complianceCheckThatShouldFailCompleteness) + .addCheck(checkHasDataInSyncTest) .run() val checkSuccessResult = verificationResult.checkResults(checkThatShouldSucceed) @@ -846,6 +854,9 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec checkFailedCompletenessResult.constraintResults.map(_.message) shouldBe List(Some("Input data does not include column fake!")) assert(checkFailedCompletenessResult.status == CheckStatus.Error) + + val checkDataSyncResult = verificationResult.checkResults(checkHasDataInSyncTest) + checkDataSyncResult.status shouldBe CheckStatus.Success } "Well-defined checks should produce correct result even if another check throws an exception" in withSparkSession { @@ -973,6 +984,170 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec List(Some("Value: 0.125 does not meet the constraint requirement!")) assert(subsetNameFailResult.status == CheckStatus.Error) } + + "Should work Data Synchronization checks for single column" in withSparkSession { + sparkSession => + val df = getDateDf(sparkSession).select("id", "product", "product_id", "units") + val dfModified = df.withColumn("id", when(col("id") === 100, 99) + .otherwise(col("id"))) + val dfColRenamed = df.withColumnRenamed("id", "id_renamed") + + val dataSyncCheckPass = Check(CheckLevel.Error, "data synchronization check pass") + .isDataSynchronized(dfModified, Map("id" -> "id"), _ > 0.7, Some("shouldPass")) + + val dataSyncCheckFail = Check(CheckLevel.Error, "data synchronization check fail") + .isDataSynchronized(dfModified, Map("id" -> "id"), _ > 0.9, Some("shouldFail")) + + val emptyDf = sparkSession.createDataFrame(sparkSession.sparkContext.emptyRDD[Row], df.schema) + val dataSyncCheckEmpty = Check(CheckLevel.Error, "data synchronization check on empty DataFrame") + .isDataSynchronized(emptyDf, Map("id" -> "id"), _ < 0.5) + + val dataSyncCheckColMismatchDestination = + Check(CheckLevel.Error, "data synchronization check col mismatch in destination") + .isDataSynchronized(dfModified, Map("id" -> "id2"), _ < 0.5) + + val dataSyncCheckColMismatchSource = + Check(CheckLevel.Error, "data synchronization check col mismatch in source") + .isDataSynchronized(dfModified, Map("id2" -> "id"), _ < 0.5) + + val dataSyncCheckColRenamed = + Check(CheckLevel.Error, "data synchronization check col names renamed") + .isDataSynchronized(dfColRenamed, Map("id" -> "id_renamed"), _ == 1.0) + + val dataSyncFullMatch = + Check(CheckLevel.Error, "data synchronization check full match") + .isDataSynchronized(df, Map("id" -> "id"), _ == 1.0) + + + val verificationResult = VerificationSuite() + .onData(df) + .addCheck(dataSyncCheckPass) + .addCheck(dataSyncCheckFail) + .addCheck(dataSyncCheckEmpty) + .addCheck(dataSyncCheckColMismatchDestination) + .addCheck(dataSyncCheckColMismatchSource) + .addCheck(dataSyncCheckColRenamed) + .addCheck(dataSyncFullMatch) + .run() + + val passResult = verificationResult.checkResults(dataSyncCheckPass) + passResult.constraintResults.map(_.message) shouldBe + List(None) + assert(passResult.status == CheckStatus.Success) + + val failResult = verificationResult.checkResults(dataSyncCheckFail) + failResult.constraintResults.map(_.message) shouldBe + List(Some("Value: 0.8 does not meet the constraint requirement! shouldFail")) + assert(failResult.status == CheckStatus.Error) + + val emptyResult = verificationResult.checkResults(dataSyncCheckEmpty) + emptyResult.constraintResults.map(_.message) shouldBe + List(Some("Value: NaN does not meet the constraint requirement!")) + assert(emptyResult.status == CheckStatus.Error) + + val colMismatchDestResult = verificationResult.checkResults(dataSyncCheckColMismatchDestination) + colMismatchDestResult.constraintResults.map(_.message) shouldBe + List(Some("Value: NaN does not meet the constraint requirement!")) + assert(colMismatchDestResult.status == CheckStatus.Error) + + val colMismatchSourceResult = verificationResult.checkResults(dataSyncCheckColMismatchSource) + colMismatchSourceResult.constraintResults.map(_.message) shouldBe + List(Some("Value: NaN does not meet the constraint requirement!")) + assert(colMismatchSourceResult.status == CheckStatus.Error) + + val colRenamedResult = verificationResult.checkResults(dataSyncCheckColRenamed) + colRenamedResult.constraintResults.map(_.message) shouldBe List(None) + assert(colRenamedResult.status == CheckStatus.Success) + + val fullMatchResult = verificationResult.checkResults(dataSyncFullMatch) + fullMatchResult.constraintResults.map(_.message) shouldBe List(None) + assert(fullMatchResult.status == CheckStatus.Success) + + } + + "Should work Data Synchronization checks for multiple column" in withSparkSession { + sparkSession => + val df = getDateDf(sparkSession).select("id", "product", "product_id", "units") + val dfModified = df.withColumn("id", when(col("id") === 100, 99) + .otherwise(col("id"))) + val dfColRenamed = df.withColumnRenamed("id", "id_renamed") + val colMap = Map("id" -> "id", "product" -> "product") + + val dataSyncCheckPass = Check(CheckLevel.Error, "data synchronization check") + .isDataSynchronized(dfModified, colMap, _ > 0.7, Some("shouldPass")) + + val dataSyncCheckFail = Check(CheckLevel.Error, "data synchronization check") + .isDataSynchronized(dfModified, colMap, _ > 0.9, Some("shouldFail")) + + val emptyDf = sparkSession.createDataFrame(sparkSession.sparkContext.emptyRDD[Row], df.schema) + val dataSyncCheckEmpty = Check(CheckLevel.Error, "data synchronization check on empty DataFrame") + .isDataSynchronized(emptyDf, colMap, _ < 0.5) + + val dataSyncCheckColMismatchDestination = + Check(CheckLevel.Error, "data synchronization check col mismatch in destination") + .isDataSynchronized(dfModified, colMap, _ > 0.9) + + val dataSyncCheckColMismatchSource = + Check(CheckLevel.Error, "data synchronization check col mismatch in source") + .isDataSynchronized(dfModified, Map("id2" -> "id", "product" -> "product"), _ < 0.5) + + val dataSyncCheckColRenamed = + Check(CheckLevel.Error, "data synchronization check col names renamed") + .isDataSynchronized(dfColRenamed, Map("id" -> "id_renamed", "product" -> "product"), _ == 1.0, + Some("shouldPass")) + + val dataSyncFullMatch = + Check(CheckLevel.Error, "data synchronization check col full match") + .isDataSynchronized(df, colMap, _ == 1, Some("shouldPass")) + + + val verificationResult = VerificationSuite() + .onData(df) + .addCheck(dataSyncCheckPass) + .addCheck(dataSyncCheckFail) + .addCheck(dataSyncCheckEmpty) + .addCheck(dataSyncCheckColMismatchDestination) + .addCheck(dataSyncCheckColMismatchSource) + .addCheck(dataSyncCheckColRenamed) + .addCheck(dataSyncFullMatch) + .run() + + val passResult = verificationResult.checkResults(dataSyncCheckPass) + passResult.constraintResults.map(_.message) shouldBe + List(None) + assert(passResult.status == CheckStatus.Success) + + val failResult = verificationResult.checkResults(dataSyncCheckFail) + failResult.constraintResults.map(_.message) shouldBe + List(Some("Value: 0.8 does not meet the constraint requirement! shouldFail")) + assert(failResult.status == CheckStatus.Error) + + val emptyResult = verificationResult.checkResults(dataSyncCheckEmpty) + emptyResult.constraintResults.map(_.message) shouldBe + List(Some("Value: NaN does not meet the constraint requirement!")) + assert(emptyResult.status == CheckStatus.Error) + + val colMismatchDestResult = verificationResult.checkResults(dataSyncCheckColMismatchDestination) + colMismatchDestResult.constraintResults.map(_.message) shouldBe + List(Some("Value: 0.8 does not meet the constraint requirement!")) + assert(colMismatchDestResult.status == CheckStatus.Error) + + val colMismatchSourceResult = verificationResult.checkResults(dataSyncCheckColMismatchSource) + colMismatchSourceResult.constraintResults.map(_.message) shouldBe + List(Some("Value: NaN does not meet the constraint requirement!")) + assert(colMismatchSourceResult.status == CheckStatus.Error) + + val colRenamedResult = verificationResult.checkResults(dataSyncCheckColRenamed) + colRenamedResult.constraintResults.map(_.message) shouldBe + List(None) + assert(colRenamedResult.status == CheckStatus.Success) + + val fullMatchResult = verificationResult.checkResults(dataSyncFullMatch) + fullMatchResult.constraintResults.map(_.message) shouldBe + List(None) + assert(fullMatchResult.status == CheckStatus.Success) + + } } /** Run anomaly detection using a repository with some previous analysis results for testing */ diff --git a/src/test/scala/com/amazon/deequ/checks/CheckTest.scala b/src/test/scala/com/amazon/deequ/checks/CheckTest.scala index 0a46b9e2..505e6d13 100644 --- a/src/test/scala/com/amazon/deequ/checks/CheckTest.scala +++ b/src/test/scala/com/amazon/deequ/checks/CheckTest.scala @@ -1,5 +1,5 @@ /** - * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License @@ -25,11 +25,12 @@ import com.amazon.deequ.metrics.{DoubleMetric, Entity} import com.amazon.deequ.repository.memory.InMemoryMetricsRepository import com.amazon.deequ.repository.{MetricsRepository, ResultKey} import com.amazon.deequ.utils.FixtureSupport +import org.apache.spark.sql.functions.{col, when} import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.scalamock.scalatest.MockFactory -import org.scalatest.wordspec.AnyWordSpec import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec import scala.util.{Success, Try} @@ -1107,6 +1108,131 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix } } + /** + * Test for DataSync in verification suite. + */ + "Check hasDataInSync" should { + + val colMapAtt1 = Map("att1" -> "att1") + val colMapTwoCols = Map("att1" -> "att1", "att2" -> "att2") + + "yield success for basic data sync test for 1 col" in withSparkSession { sparkSession => + val dfInformative = getDfWithConditionallyInformativeColumns(sparkSession) + + val check = Check(CheckLevel.Error, "must have data in sync") + .isDataSynchronized(dfInformative, colMapAtt1, _ > 0.9, Some("show be in sync")) + val context = runChecks(dfInformative, check) + + assertSuccess(check, context) + } + + "yield failure when column doesnt exist in data sync test for 1 col" in withSparkSession { sparkSession => + val dfInformative = getDfWithConditionallyInformativeColumns(sparkSession) + val dfInformativeRenamed = dfInformative.withColumnRenamed("att1", "att1_renamed") + + val check = Check(CheckLevel.Error, "must fail as columns does not exist") + .isDataSynchronized(dfInformativeRenamed, colMapAtt1, _ > 0.9, Some("must fail as columns does not exist")) + val context = runChecks(dfInformative, check) + assertEvaluatesTo(check, context, CheckStatus.Error) + println(context) + } + + "yield failure when row count varies in data sync test for 1 col" in withSparkSession { sparkSession => + val dfInformative = getDfWithConditionallyInformativeColumns(sparkSession) + val dfInformativeFiltered = dfInformative.filter("att1 > 2") + + val check = Check(CheckLevel.Error, "must fail as columns does not exist") + .isDataSynchronized(dfInformativeFiltered, colMapAtt1, _ > 0.9, Some("must fail as columns does not exist")) + val context = runChecks(dfInformative, check) + assertEvaluatesTo(check, context, CheckStatus.Error) + } + + "yield failed assertion for 0.9 for 1 col" in withSparkSession { sparkSession => + val df = getDfWithConditionallyInformativeColumns(sparkSession) + val modifiedDf = df.withColumn("att1", when(col("att1") === 3, 4) + .otherwise(col("att1"))) + + val check = Check(CheckLevel.Error, "must fail as rows mismatches") + .isDataSynchronized(modifiedDf, colMapAtt1, _ > 0.9, Some("must fail as rows mismatches")) + val context = runChecks(df, check) + assertEvaluatesTo(check, context, CheckStatus.Error) + + } + + "yield failed assertion for 0.6 for 1 col" in withSparkSession { sparkSession => + val df = getDfWithConditionallyInformativeColumns(sparkSession) + val modifiedDf = df.withColumn("att1", when(col("att1") === 3, 4) + .otherwise(col("att1"))) + + val check = Check(CheckLevel.Error, "must be success as rows count mismatches at assertion 0.6") + .isDataSynchronized(modifiedDf, colMapAtt1, _ > 0.6, + Some("must be success as rows count mismatches at assertion 0.6")) + val context = runChecks(df, check) + assertSuccess(check, context) + } + + + "yield success for basic data sync test for multiple columns" in withSparkSession { sparkSession => + val dfInformative = getDfWithConditionallyInformativeColumns(sparkSession) + + val check = Check(CheckLevel.Error, "must have data in sync") + .isDataSynchronized(dfInformative, colMapTwoCols, _ > 0.9, Some("show be in sync")) + val context = runChecks(dfInformative, check) + + assertSuccess(check, context) + } + + "yield failure when column doesnt exist in data sync test for multiple columns" in withSparkSession { + sparkSession => + val dfInformative = getDfWithConditionallyInformativeColumns(sparkSession) + val dfInformativeRenamed = dfInformative.withColumnRenamed("att1", "att1_renamed") + + val check = Check(CheckLevel.Error, "must fail as columns does not exist") + .isDataSynchronized(dfInformativeRenamed, colMapTwoCols, _ > 0.9, Some("must fail as columns does not exist")) + val context = runChecks(dfInformative, check) + + assertEvaluatesTo(check, context, CheckStatus.Error) + } + + "yield failure when row count varies in data sync test for multiple columns" in withSparkSession { sparkSession => + val dfInformative = getDfWithConditionallyInformativeColumns(sparkSession) + val dfInformativeFiltered = dfInformative.filter("att1 > 2") + + val check = Check(CheckLevel.Error, "must fail as columns does not exist") + .isDataSynchronized(dfInformativeFiltered, colMapTwoCols, _ > 0.9, Some("must fail as columns does not exist")) + val context = runChecks(dfInformative, check) + + assertEvaluatesTo(check, context, CheckStatus.Error) + } + + "yield failed assertion for 0.9 for multiple columns" in withSparkSession { sparkSession => + val df = getDfWithConditionallyInformativeColumns(sparkSession) + val modifiedDf = df.withColumn("att1", when(col("att1") === 3, 4) + .otherwise(col("att1"))) + + val check = Check(CheckLevel.Error, "must fail as rows mismatches") + .isDataSynchronized(modifiedDf, colMapTwoCols, _ > 0.9, Some("must fail as rows mismatches")) + val context = runChecks(df, check) + + assertEvaluatesTo(check, context, CheckStatus.Error) + + } + + "yield failed assertion for 0.6 for multiple columns" in withSparkSession { sparkSession => + val df = getDfWithConditionallyInformativeColumns(sparkSession) + val modifiedDf = df.withColumn("att1", when(col("att1") === 3, 4) + .otherwise(col("att1"))) + + val check = Check(CheckLevel.Error, "must be success as metric value is 0.66") + .isDataSynchronized(modifiedDf, colMapTwoCols, _ > 0.6, + Some("must be success as metric value is 0.66")) + val context = runChecks(df, check) + + assertSuccess(check, context) + } + + } + /** Run anomaly detection using a repository with some previous analysis results for testing */ private[this] def evaluateWithRepository(test: MetricsRepository => Unit): Unit = { diff --git a/src/test/scala/com/amazon/deequ/comparison/DataSynchronizationTest.scala b/src/test/scala/com/amazon/deequ/comparison/DataSynchronizationTest.scala index 932b3cba..dd3a002d 100644 --- a/src/test/scala/com/amazon/deequ/comparison/DataSynchronizationTest.scala +++ b/src/test/scala/com/amazon/deequ/comparison/DataSynchronizationTest.scala @@ -1,5 +1,5 @@ /** - * Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License @@ -56,8 +56,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val compCols = Map("name" -> "name") val assertion: Double => Boolean = _ >= 0.60 - val result = (DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion)) - assert(result.isInstanceOf[ComparisonSucceeded]) + val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion) + assert(result.isInstanceOf[DataSynchronizationSucceeded]) } "match == 0.83 when id is colKey and state is compCols" in withSparkSession { spark => @@ -87,8 +87,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val compCols = Map("state" -> "state") val assertion: Double => Boolean = _ >= 0.80 - val result = (DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion)) - assert(result.isInstanceOf[ComparisonSucceeded]) + val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion) + assert(result.isInstanceOf[DataSynchronizationSucceeded]) } "return false because col name isn't unique" in withSparkSession { spark => @@ -118,8 +118,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val compCols = Map("state" -> "state") val assertion: Double => Boolean = _ >= 0.66 - val result = (DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion)) - assert(result.isInstanceOf[ComparisonFailed]) + val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion) + assert(result.isInstanceOf[DataSynchronizationFailed]) } "match >= 0.66 when id is unique col, rest compCols" in withSparkSession { spark => @@ -149,8 +149,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val compCols = Map("name" -> "name", "state" -> "state") val assertion: Double => Boolean = _ >= 0.60 - val result = (DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion)) - assert(result.isInstanceOf[ComparisonSucceeded]) + val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion) + assert(result.isInstanceOf[DataSynchronizationSucceeded]) } "match >= 0.66 (same test as above only the data sets change)" in withSparkSession{ spark => @@ -180,8 +180,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val compCols = Map("name" -> "name", "state" -> "state") val assertion: Double => Boolean = _ >= 0.60 - val result = (DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion)) - assert(result.isInstanceOf[ComparisonSucceeded]) + val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion) + assert(result.isInstanceOf[DataSynchronizationSucceeded]) } "return false because the id col in ds1 isn't unique" in withSparkSession { spark => @@ -212,8 +212,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val compCols = Map("name" -> "name", "state" -> "state") val assertion: Double => Boolean = _ >= 0.40 - val result = (DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion)) - assert(result.asInstanceOf[ComparisonFailed].errorMessage == + val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion) + assert(result.asInstanceOf[DataSynchronizationFailed].errorMessage == "The selected columns are not comparable due to duplicates present in the dataset." + "Comparison keys must be unique, but in Dataframe 1, there are 6 unique records and 7 rows, " + "and in Dataframe 2, there are 6 unique records and 6 rows, based on the combination of keys " + @@ -248,8 +248,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val compCols = Map("name" -> "name", "state" -> "state") val assertion: Double => Boolean = _ >= 0.40 - val result = (DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion)) - assert(result.isInstanceOf[ComparisonFailed]) + val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion) + assert(result.isInstanceOf[DataSynchronizationFailed]) } "return false because col state isn't unique" in withSparkSession { spark => @@ -280,7 +280,7 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val assertion: Double => Boolean = _ >= 0.66 val result = (DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion)) - assert(result.isInstanceOf[ComparisonFailed]) + assert(result.isInstanceOf[DataSynchronizationFailed]) } "check all columns and return an assertion of .66" in withSparkSession { spark => @@ -309,8 +309,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val colKeyMap = Map("id" -> "id") val assertion: Double => Boolean = _ >= 0.66 - val result = (DataSynchronization.columnMatch(ds1, ds2, colKeyMap, assertion)) - assert(result.isInstanceOf[ComparisonSucceeded]) + val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, assertion) + assert(result.isInstanceOf[DataSynchronizationSucceeded]) } "return false because state column isn't unique" in withSparkSession { spark => @@ -339,8 +339,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val colKeyMap = Map("state" -> "state") val assertion: Double => Boolean = _ >= 0.66 - val result = (DataSynchronization.columnMatch(ds1, ds2, colKeyMap, assertion)) - assert(result.isInstanceOf[ComparisonFailed]) + val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, assertion) + assert(result.isInstanceOf[DataSynchronizationFailed]) } "check all columns" in withSparkSession { spark => @@ -369,8 +369,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val colKeyMap = Map("id" -> "id") val assertion: Double => Boolean = _ >= 0.66 - val result = (DataSynchronization.columnMatch(ds1, ds2, colKeyMap, assertion)) - assert(result.isInstanceOf[ComparisonSucceeded]) + val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, assertion) + assert(result.isInstanceOf[DataSynchronizationSucceeded]) } "cols exist but 0 matches" in withSparkSession { spark => import spark.implicits._ @@ -398,8 +398,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val colKeyMap = Map("id" -> "id") val assertion: Double => Boolean = _ >= 0 - val result = (DataSynchronization.columnMatch(ds1, ds2, colKeyMap, assertion)) - assert(result.isInstanceOf[ComparisonSucceeded]) + val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, assertion) + assert(result.isInstanceOf[DataSynchronizationSucceeded]) } } @@ -643,7 +643,7 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { // Overall val assertion: Double => Boolean = _ >= 0.6 // 4 out of 6 rows match val overallResult = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, assertion) - assert(overallResult.isInstanceOf[ComparisonSucceeded]) + assert(overallResult.isInstanceOf[DataSynchronizationSucceeded]) // Row Level val outcomeColName = "outcome" @@ -670,7 +670,7 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { // Overall val assertion: Double => Boolean = _ >= 0.6 // 4 out of 6 rows match val overallResult = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, assertion) - assert(overallResult.isInstanceOf[ComparisonSucceeded]) + assert(overallResult.isInstanceOf[DataSynchronizationSucceeded]) // Row Level val outcomeColName = "outcome" @@ -700,8 +700,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val colKeyMap1 = Map(nonExistCol1 -> nonExistCol2) val overallResult1 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap1, assertion) - assert(overallResult1.isInstanceOf[ComparisonFailed]) - val failedOverallResult1 = overallResult1.asInstanceOf[ComparisonFailed] + assert(overallResult1.isInstanceOf[DataSynchronizationFailed]) + val failedOverallResult1 = overallResult1.asInstanceOf[DataSynchronizationFailed] assert(failedOverallResult1.errorMessage.contains("key columns were not found in the first dataset")) assert(failedOverallResult1.errorMessage.contains(nonExistCol1)) @@ -716,8 +716,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val colKeyMap2 = Map(nonExistCol1 -> idColumnName) val overallResult2 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap2, assertion) - assert(overallResult2.isInstanceOf[ComparisonFailed]) - val failedOverallResult2 = overallResult2.asInstanceOf[ComparisonFailed] + assert(overallResult2.isInstanceOf[DataSynchronizationFailed]) + val failedOverallResult2 = overallResult2.asInstanceOf[DataSynchronizationFailed] assert(failedOverallResult2.errorMessage.contains("key columns were not found in the first dataset")) assert(failedOverallResult2.errorMessage.contains(nonExistCol1)) @@ -732,8 +732,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val colKeyMap3 = Map(idColumnName -> nonExistCol2) val overallResult3 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap3, assertion) - assert(overallResult3.isInstanceOf[ComparisonFailed]) - val failedOverallResult3 = overallResult3.asInstanceOf[ComparisonFailed] + assert(overallResult3.isInstanceOf[DataSynchronizationFailed]) + val failedOverallResult3 = overallResult3.asInstanceOf[DataSynchronizationFailed] assert(failedOverallResult3.errorMessage.contains("key columns were not found in the second dataset")) assert(failedOverallResult3.errorMessage.contains(nonExistCol2)) @@ -759,8 +759,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val compColsMap1 = Map(nonExistCol1 -> nonExistCol2) val overallResult1 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compColsMap1, assertion) - assert(overallResult1.isInstanceOf[ComparisonFailed]) - val failedOverallResult1 = overallResult1.asInstanceOf[ComparisonFailed] + assert(overallResult1.isInstanceOf[DataSynchronizationFailed]) + val failedOverallResult1 = overallResult1.asInstanceOf[DataSynchronizationFailed] assert(failedOverallResult1.errorMessage.contains( s"The following columns were not found in the first dataset: $nonExistCol1")) @@ -775,8 +775,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val compColsMap2 = Map(nonExistCol1 -> "State") val overallResult2 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compColsMap2, assertion) - assert(overallResult2.isInstanceOf[ComparisonFailed]) - val failedOverallResult2 = overallResult2.asInstanceOf[ComparisonFailed] + assert(overallResult2.isInstanceOf[DataSynchronizationFailed]) + val failedOverallResult2 = overallResult2.asInstanceOf[DataSynchronizationFailed] assert(failedOverallResult2.errorMessage.contains( s"The following columns were not found in the first dataset: $nonExistCol1")) @@ -791,8 +791,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val compColsMap3 = Map("state" -> nonExistCol2) val overallResult3 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compColsMap3, assertion) - assert(overallResult3.isInstanceOf[ComparisonFailed]) - val failedOverallResult3 = overallResult3.asInstanceOf[ComparisonFailed] + assert(overallResult3.isInstanceOf[DataSynchronizationFailed]) + val failedOverallResult3 = overallResult3.asInstanceOf[DataSynchronizationFailed] assert(failedOverallResult3.errorMessage.contains( s"The following columns were not found in the second dataset: $nonExistCol2")) @@ -800,7 +800,7 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val rowLevelResult3 = DataSynchronization.columnMatchRowLevel(ds1, ds2, colKeyMap, Some(compColsMap3)) assert(rowLevelResult3.isLeft) val failedRowLevelResult3 = rowLevelResult3.left.get - assert(failedOverallResult3.errorMessage.contains( + assert(failedRowLevelResult3.errorMessage.contains( s"The following columns were not found in the second dataset: $nonExistCol2")) } }