diff --git a/src/main/resources/statistics/cmap.conf b/src/main/resources/statistics/cmap.conf index 7fd60e9..3a8cee5 100644 --- a/src/main/resources/statistics/cmap.conf +++ b/src/main/resources/statistics/cmap.conf @@ -1,31 +1,77 @@ +compare_with_old = true + +# old_flat tables are only used when comparing flat data old_flat = [ { name = "DCIR" main_table = "ER_PRS_F" date_format = "dd/MM/yyyy" input_path = "/shared/Observapur/old_flattening/joins/DCIR" - output_stat_path = "/shared/Observapur/staging/Flattening/statistics/old_flat/DCIR" + output_stat_path = "/shared/Observapur/staging/Flattening/statistics/DCIR/old_flat" } { name = "MCO" main_table = "MCO_C" date_format = "ddMMMyyyy" input_path = "/shared/Observapur/old_flattening/joins/MCO" - output_stat_path = "/shared/Observapur/staging/Flattening/statistics/old_flat/MCO" + output_stat_path = "/shared/Observapur/staging/Flattening/statistics/MCO/old_flat" } ] -new_flat = [ +main_flat = [ { name = "DCIR" main_table = "ER_PRS_F" input_path = "/shared/Observapur/staging/Flattening/flat_table/DCIR" - output_stat_path = "/shared/Observapur/staging/Flattening/statistics/new_flat/DCIR" + output_stat_path = "/shared/Observapur/staging/Flattening/statistics/DCIR/new_flat" } { name = "MCO" main_table = "MCO_C" input_path = "/shared/Observapur/staging/Flattening/flat_table/MCO" - output_stat_path = "/shared/Observapur/staging/Flattening/statistics/new_flat/MCO" + output_stat_path = "/shared/Observapur/staging/Flattening/statistics/MCO/new_flat" + } +] + +single_tables = [ + { + name = "ER_CAM_F" + input_path = "/shared/Observapur/staging/Flattening/single_table/ER_CAM_F" + output_stat_path = "/shared/Observapur/staging/Flattening/statistics/DCIR/single_tables/table=ER_CAM_F" + } + { + name = "ER_PHA_F" + input_path = "/shared/Observapur/staging/Flattening/single_table/ER_PHA_F" + output_stat_path = "/shared/Observapur/staging/Flattening/statistics/DCIR/single_tables/table=ER_PHA_F" + } + { + name = "ER_PRS_F" + input_path = "/shared/Observapur/staging/Flattening/single_table/ER_PRS_F" + output_stat_path = "/shared/Observapur/staging/Flattening/statistics/DCIR/single_tables/table=ER_PRS_F" + } + { + name = "MCO_A" + input_path = "/shared/Observapur/staging/Flattening/single_table/MCO_A" + output_stat_path = "/shared/Observapur/staging/Flattening/statistics/MCO/single_tables/table=MCO_A" + } + { + name = "MCO_B" + input_path = "/shared/Observapur/staging/Flattening/single_table/MCO_B" + output_stat_path = "/shared/Observapur/staging/Flattening/statistics/MCO/single_tables/table=MCO_B" + } + { + name = "MCO_C" + input_path = "/shared/Observapur/staging/Flattening/single_table/MCO_C" + output_stat_path = "/shared/Observapur/staging/Flattening/statistics/MCO/single_tables/table=MCO_C" + } + { + name = "MCO_D" + input_path = "/shared/Observapur/staging/Flattening/single_table/MCO_D" + output_stat_path = "/shared/Observapur/staging/Flattening/statistics/MCO/single_tables/table=MCO_D" + } + { + name = "MCO_UM" + input_path = "/shared/Observapur/staging/Flattening/single_table/MCO_UM" + output_stat_path = "/shared/Observapur/staging/Flattening/statistics/MCO/single_tables/table=MCO_UM" } ] \ No newline at end of file diff --git a/src/main/resources/statistics/test.conf b/src/main/resources/statistics/test.conf index 56b6d16..d532a8e 100644 --- a/src/main/resources/statistics/test.conf +++ b/src/main/resources/statistics/test.conf @@ -1,3 +1,5 @@ +compare_with_old = true + new_flat = [ { name = "MCO" diff --git a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/CustomStatistics.scala b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/CustomStatistics.scala index 370d5e3..f564d0f 100644 --- a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/CustomStatistics.scala +++ b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/CustomStatistics.scala @@ -2,53 +2,51 @@ package fr.polytechnique.cmap.cnam.statistics import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.{NumericType, StructField, StructType} +import org.apache.spark.sql.types.{NumericType, StructField} object CustomStatistics { implicit class Statistics(val df: DataFrame) { - def customDescribe(inputColumns: Seq[String], distinctOnly: Boolean = false): DataFrame = { + def customDescribe( + colNames: Seq[String] = df.columns, + distinctOnly: Boolean = false): DataFrame = { - def computeAvailableAgg( - schema: StructType, - colName: String): DataFrame = df.schema(colName) match { + def columnStatistics(colName: String): DataFrame = df.schema(colName) match { + // For numeric type, compute all stats case StructField(numericColumn: String, _: NumericType, _, _) => df.select(numericColumn) - .agg( - min(numericColumn).cast("string").as("Min"), - max(numericColumn).cast("string").as("Max"), - count(numericColumn).cast("long").as("Count"), - countDistinct(numericColumn).cast("long").as("CountDistinct"), - sum(numericColumn).cast("double").as("Sum"), - sumDistinct(numericColumn).cast("double").as("SumDistinct"), - avg(numericColumn).cast("double").as("Avg") - ).withColumn("ColName", lit(numericColumn)) - + .agg( + min(numericColumn).cast("string").as("Min"), + max(numericColumn).cast("string").as("Max"), + count(numericColumn).cast("long").as("Count"), + countDistinct(numericColumn).cast("long").as("CountDistinct"), + sum(numericColumn).cast("double").as("Sum"), + sumDistinct(numericColumn).cast("double").as("SumDistinct"), + avg(numericColumn).cast("double").as("Avg") + ).withColumn("ColName", lit(numericColumn)) + + // For other types, compute only min, max and counts case _ => df.select(colName) - .agg( - min(colName).cast("string").as("Min"), - max(colName).cast("string").as("Max"), - count(colName).cast("long").as("Count"), - countDistinct(colName).cast("long").as("CountDistinct") - ).withColumn("Sum", lit(null).cast("double")) - .withColumn("SumDistinct", lit(null).cast("double")) - .withColumn("Avg", lit(null).cast("double")) - .withColumn("ColName", lit(colName)) + .agg( + min(colName).cast("string").as("Min"), + max(colName).cast("string").as("Max"), + count(colName).cast("long").as("Count"), + countDistinct(colName).cast("long").as("CountDistinct") + ).withColumn("Sum", lit(null).cast("double")) + .withColumn("SumDistinct", lit(null).cast("double")) + .withColumn("Avg", lit(null).cast("double")) + .withColumn("ColName", lit(colName)) } - val outputDF: DataFrame = inputColumns - .map(computeAvailableAgg(df.schema, _)) + val outputDF: DataFrame = colNames + .map(columnStatistics) .reduce(_.union(_)) - if (distinctOnly) { - outputDF - .drop("Count", "Sum", "Avg") - } else { - outputDF - } + if (distinctOnly) outputDF.drop("Count", "Sum", "Avg") + else outputDF } } } diff --git a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/OldFlatHelper.scala b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/FlatTableHelper.scala similarity index 85% rename from src/main/scala/fr/polytechnique/cmap/cnam/statistics/OldFlatHelper.scala rename to src/main/scala/fr/polytechnique/cmap/cnam/statistics/FlatTableHelper.scala index 137e7c7..57a231f 100644 --- a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/OldFlatHelper.scala +++ b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/FlatTableHelper.scala @@ -5,7 +5,7 @@ import org.apache.spark.sql.functions.col import fr.polytechnique.cmap.cnam.statistics import fr.polytechnique.cmap.cnam.utilities.DFUtils -object OldFlatHelper { +object FlatTableHelper { implicit class ImplicitDF(data: DataFrame) { @@ -60,13 +60,15 @@ object OldFlatHelper { import statistics.CustomStatistics._ - def computeStatistics: DataFrame = data.customDescribe(data.columns) + def computeStatistics(distinctOnly: Boolean): DataFrame = { + data.customDescribe(data.columns, distinctOnly) + } - def writeStatistics(outputPath: String): Unit = { + def writeStatistics(outputPath: String, distinctOnly: Boolean = false): Unit = { data - .computeStatistics - .write - .parquet(outputPath) + .computeStatistics(distinctOnly) + .write + .parquet(outputPath) } } diff --git a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/SingleTablesStatistics.scala b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/SingleTablesStatistics.scala new file mode 100644 index 0000000..6966f30 --- /dev/null +++ b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/SingleTablesStatistics.scala @@ -0,0 +1,32 @@ +package fr.polytechnique.cmap.cnam.statistics + +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.functions._ +import fr.polytechnique.cmap.cnam.statistics.CustomStatistics._ + +object SingleTablesStatistics { + + // Todo: This method works only when there is no unseen keys (key colNames) in the individual DFs + // that don't exists in the DF that contains the key (eq, PRS for DCIR, C for PMSI MCO ). + // We should waive this restriction in the future versions. + + def describeSingleTables(singleTables: List[DataFrame]): DataFrame = { + + // Selects all the data of a column from all tables containing this column + def selectColumnFromTables(colName: String): DataFrame = { + singleTables + .collect { + case table if table.columns.contains(colName) => table.select(col(colName)) + } + .reduce(_.union(_)) + } + + val columnsToDescribe: Set[String] = singleTables.map(_.columns).reduce(_ ++ _).toSet + + val describedColumns: List[DataFrame] = columnsToDescribe.map { + colName => selectColumnFromTables(colName).customDescribe(distinctOnly = true) + }.toList + + describedColumns.reduce(_.union(_)) + } +} \ No newline at end of file diff --git a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsConfig.scala b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsConfig.scala index 3974295..143d6d6 100644 --- a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsConfig.scala +++ b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsConfig.scala @@ -17,6 +17,8 @@ object StatisticsConfig { newConfig.withFallback(defaultConfig).resolve() } + val compareWithOldFlattening: Boolean = conf.getBoolean("compare_with_old") + val oldFlatConfig: List[Config] = { if(conf.hasPath("old_flat")) { conf.getConfigList("old_flat").asScala.toList @@ -25,7 +27,7 @@ object StatisticsConfig { } } - val newFlatConfig: List[Config] = { + val mainFlatConfig: List[Config] = { if(conf.hasPath("new_flat")) { conf.getConfigList("new_flat").asScala.toList } else { @@ -33,6 +35,14 @@ object StatisticsConfig { } } + val singleTablesConfig: List[Config] = { + if(conf.hasPath("single_tables")) { + conf.getConfigList("single_tables").asScala.toList + } else { + List[Config]() + } + } + implicit class StatConfig(statConf: Config) { val flatTableName: String = statConf.getString("name") diff --git a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMain.scala b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMain.scala index c1a1d42..463d150 100644 --- a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMain.scala +++ b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMain.scala @@ -2,6 +2,7 @@ package fr.polytechnique.cmap.cnam.statistics import org.apache.spark.sql.{Dataset, SQLContext} import fr.polytechnique.cmap.cnam.flattening.FlatteningConfig +import fr.polytechnique.cmap.cnam.utilities.DFUtils.readParquet import fr.polytechnique.cmap.cnam.{Main, utilities} case class TableSchema(tableName: String, columnTypes: Map[String, String]) @@ -12,36 +13,62 @@ object StatisticsMain extends Main { def run(sqlContext: SQLContext, argsMap: Map[String, String]): Option[Dataset[_]] = { - argsMap.get("conf").foreach(sqlContext.setConf("conf", _)) - argsMap.get("env").foreach(sqlContext.setConf("env", _)) + def describeOldFlatTable(): Unit = { - import utilities.DFUtils._ - val tablesSchema: List[TableSchema] = FlatteningConfig.columnTypes.map(x => - TableSchema(x._1, x._2.toMap)).toList + import FlatTableHelper.ImplicitDF + import StatisticsConfig.StatConfig - import OldFlatHelper.ImplicitDF - import StatisticsConfig.StatConfig + val tablesSchema: List[TableSchema] = FlatteningConfig.columnTypes.map(x => + TableSchema(x._1, x._2.toMap)).toList - StatisticsConfig.oldFlatConfig.foreach { conf => - logger.info(s"Computing Statistics on the old Flat: ${conf.flatTableName}") - println(conf.prettyPrint) + StatisticsConfig.oldFlatConfig.foreach { conf => + logger.info(s"Computing Statistics on the old Flat: ${conf.flatTableName}") + println(conf.prettyPrint) - readParquet(sqlContext, conf.inputPath) - .drop("key") - .changeColumnNameDelimiter - .changeSchema(tablesSchema, conf.mainTableName, conf.dateFormat) - .writeStatistics(conf.statOutputPath) + readParquet(sqlContext, conf.inputPath) + .drop("key") + .changeColumnNameDelimiter + .changeSchema(tablesSchema, conf.mainTableName, conf.dateFormat) + } } - StatisticsConfig.newFlatConfig.foreach { conf => - logger.info(s"Computing Statistics on the new Flat: ${conf.flatTableName}") - println(conf.prettyPrint) + def describeSingleTables(): Unit = { + + import StatisticsConfig.StatConfig + + StatisticsConfig.singleTablesConfig.foreach { conf => + logger.info(s"Computing Statistics on the single table: ${conf.flatTableName}") + println(conf.prettyPrint) - readParquet(sqlContext, conf.inputPath) - .drop("year") - .writeStatistics(conf.statOutputPath) + import CustomStatistics.Statistics + readParquet(sqlContext, conf.inputPath) + .customDescribe(distinctOnly = true) + .write.parquet(conf.statOutputPath) + } } + def describeMainFlatTable(): Unit = { + + import FlatTableHelper.ImplicitDF + import StatisticsConfig.StatConfig + + StatisticsConfig.mainFlatConfig.foreach { conf => + logger.info(s"Computing Statistics on the main Flat: ${conf.flatTableName}") + println(conf.prettyPrint) + + readParquet(sqlContext, conf.inputPath) + .drop("year") + .writeStatistics(conf.statOutputPath) + } + } + + argsMap.get("conf").foreach(sqlContext.setConf("conf", _)) + argsMap.get("env").foreach(sqlContext.setConf("env", _)) + + if(StatisticsConfig.compareWithOldFlattening) describeOldFlatTable() + describeSingleTables() + describeMainFlatTable() + None } } diff --git a/src/test/scala/fr/polytechnique/cmap/cnam/statistics/OldFlatHelperSuite.scala b/src/test/scala/fr/polytechnique/cmap/cnam/statistics/FlatTableHelperSuite.scala similarity index 94% rename from src/test/scala/fr/polytechnique/cmap/cnam/statistics/OldFlatHelperSuite.scala rename to src/test/scala/fr/polytechnique/cmap/cnam/statistics/FlatTableHelperSuite.scala index a80a811..d555ecc 100644 --- a/src/test/scala/fr/polytechnique/cmap/cnam/statistics/OldFlatHelperSuite.scala +++ b/src/test/scala/fr/polytechnique/cmap/cnam/statistics/FlatTableHelperSuite.scala @@ -4,7 +4,7 @@ import java.sql.Date import org.apache.spark.sql.types.TimestampType import fr.polytechnique.cmap.cnam.{SharedContext, utilities} -class OldFlatHelperSuite extends SharedContext { +class FlatTableHelperSuite extends SharedContext { "changeColumnNameDelimiter" should "change the column name delimiters from dot to underscore " in { @@ -21,7 +21,7 @@ class OldFlatHelperSuite extends SharedContext { ).toDF("key1", "noDelimiter", "key__delimiter1", "key__delimiter2") // When - import OldFlatHelper.ImplicitDF + import FlatTableHelper.ImplicitDF val result = inputDf.changeColumnNameDelimiter // Then @@ -53,7 +53,7 @@ class OldFlatHelperSuite extends SharedContext { val dateFormat = "yyyy-MM-dd" // When - import OldFlatHelper.ImplicitDF + import FlatTableHelper.ImplicitDF val result = inputDf.changeSchema(inputSchema, mainTableName, dateFormat) // Then @@ -88,7 +88,7 @@ class OldFlatHelperSuite extends SharedContext { val mainTableName = "ER_PRS_F" // When - import OldFlatHelper.ImplicitDF + import FlatTableHelper.ImplicitDF val result = inputDf.changeSchema(schemaMap, mainTableName) // Then @@ -130,7 +130,7 @@ class OldFlatHelperSuite extends SharedContext { val sampleDf = Seq("dummyValue").toDF("Col_1") // When - import OldFlatHelper.ImplicitDF + import FlatTableHelper.ImplicitDF val testResult = testInput.map { testInput => sampleDf.annotateJoiningTablesColumns(testInput, mainTableName) } @@ -152,7 +152,7 @@ class OldFlatHelperSuite extends SharedContext { val sampleDf = Seq("dummyValue").toDF("Col_1") // When - import OldFlatHelper.ImplicitDF + import FlatTableHelper.ImplicitDF val result = sampleDf.prefixColName(tableName, columnName) // Then @@ -169,7 +169,7 @@ class OldFlatHelperSuite extends SharedContext { val expectedResult = sqlContext.read.parquet(expectedResultPath) // When - import OldFlatHelper.ImplicitDF + import FlatTableHelper.ImplicitDF inputDf.writeStatistics(resultPath) val result = sqlContext.read.parquet(resultPath) diff --git a/src/test/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsConfigSuite.scala b/src/test/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsConfigSuite.scala index 69b6e26..c9f5699 100644 --- a/src/test/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsConfigSuite.scala +++ b/src/test/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsConfigSuite.scala @@ -15,7 +15,7 @@ class StatisticsConfigSuite extends SharedContext { // When import fr.polytechnique.cmap.cnam.statistics.StatisticsConfig._ - val result = StatisticsConfig.newFlatConfig.head.prettyPrint + val result = StatisticsConfig.mainFlatConfig.head.prettyPrint println(result) println(expectedResult) diff --git a/src/test/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMainSuite.scala b/src/test/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMainSuite.scala index de8132f..ddce0bc 100644 --- a/src/test/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMainSuite.scala +++ b/src/test/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMainSuite.scala @@ -18,6 +18,5 @@ class StatisticsMainSuite extends SharedContext{ // Then import utilities.DFUtils.CSVDataFrame assert(result sameAs expectedResult) - } }