diff --git a/src/main/resources/statistics/cmap.conf b/src/main/resources/statistics/cmap.conf index 3a8cee5..9889cea 100644 --- a/src/main/resources/statistics/cmap.conf +++ b/src/main/resources/statistics/cmap.conf @@ -1,16 +1,16 @@ -compare_with_old = true +describe_old = true -# old_flat tables are only used when comparing flat data +# old_flat tables are only used when describing old flat data old_flat = [ { - name = "DCIR" + table_name = "DCIR" main_table = "ER_PRS_F" date_format = "dd/MM/yyyy" input_path = "/shared/Observapur/old_flattening/joins/DCIR" output_stat_path = "/shared/Observapur/staging/Flattening/statistics/DCIR/old_flat" } { - name = "MCO" + table_name = "MCO" main_table = "MCO_C" date_format = "ddMMMyyyy" input_path = "/shared/Observapur/old_flattening/joins/MCO" @@ -20,58 +20,51 @@ old_flat = [ main_flat = [ { - name = "DCIR" + table_name = "DCIR" main_table = "ER_PRS_F" input_path = "/shared/Observapur/staging/Flattening/flat_table/DCIR" - output_stat_path = "/shared/Observapur/staging/Flattening/statistics/DCIR/new_flat" + output_stat_path = "/shared/Observapur/staging/Flattening/statistics/DCIR" + single_tables = [ + { + table_name = "ER_CAM_F" + input_path = "/shared/Observapur/staging/Flattening/single_table/ER_CAM_F" + } + { + table_name = "ER_PHA_F" + input_path = "/shared/Observapur/staging/Flattening/single_table/ER_PHA_F" + } + { + table_name = "ER_PRS_F" + input_path = "/shared/Observapur/staging/Flattening/single_table/ER_PRS_F" + } + ] } { - name = "MCO" + table_name = "MCO" main_table = "MCO_C" input_path = "/shared/Observapur/staging/Flattening/flat_table/MCO" - output_stat_path = "/shared/Observapur/staging/Flattening/statistics/MCO/new_flat" - } -] - -single_tables = [ - { - name = "ER_CAM_F" - input_path = "/shared/Observapur/staging/Flattening/single_table/ER_CAM_F" - output_stat_path = "/shared/Observapur/staging/Flattening/statistics/DCIR/single_tables/table=ER_CAM_F" - } - { - name = "ER_PHA_F" - input_path = "/shared/Observapur/staging/Flattening/single_table/ER_PHA_F" - output_stat_path = "/shared/Observapur/staging/Flattening/statistics/DCIR/single_tables/table=ER_PHA_F" - } - { - name = "ER_PRS_F" - input_path = "/shared/Observapur/staging/Flattening/single_table/ER_PRS_F" - output_stat_path = "/shared/Observapur/staging/Flattening/statistics/DCIR/single_tables/table=ER_PRS_F" - } - { - name = "MCO_A" - input_path = "/shared/Observapur/staging/Flattening/single_table/MCO_A" - output_stat_path = "/shared/Observapur/staging/Flattening/statistics/MCO/single_tables/table=MCO_A" - } - { - name = "MCO_B" - input_path = "/shared/Observapur/staging/Flattening/single_table/MCO_B" - output_stat_path = "/shared/Observapur/staging/Flattening/statistics/MCO/single_tables/table=MCO_B" - } - { - name = "MCO_C" - input_path = "/shared/Observapur/staging/Flattening/single_table/MCO_C" - output_stat_path = "/shared/Observapur/staging/Flattening/statistics/MCO/single_tables/table=MCO_C" - } - { - name = "MCO_D" - input_path = "/shared/Observapur/staging/Flattening/single_table/MCO_D" - output_stat_path = "/shared/Observapur/staging/Flattening/statistics/MCO/single_tables/table=MCO_D" - } - { - name = "MCO_UM" - input_path = "/shared/Observapur/staging/Flattening/single_table/MCO_UM" - output_stat_path = "/shared/Observapur/staging/Flattening/statistics/MCO/single_tables/table=MCO_UM" + output_stat_path = "/shared/Observapur/staging/Flattening/statistics/MCO" + single_tables = [ + { + table_name = "MCO_A" + input_path = "/shared/Observapur/staging/Flattening/single_table/MCO_A" + } + { + table_name = "MCO_B" + input_path = "/shared/Observapur/staging/Flattening/single_table/MCO_B" + } + { + table_name = "MCO_C" + input_path = "/shared/Observapur/staging/Flattening/single_table/MCO_C" + } + { + table_name = "MCO_D" + input_path = "/shared/Observapur/staging/Flattening/single_table/MCO_D" + } + { + table_name = "MCO_UM" + input_path = "/shared/Observapur/staging/Flattening/single_table/MCO_UM" + } + ] } ] \ No newline at end of file diff --git a/src/main/resources/statistics/test.conf b/src/main/resources/statistics/test.conf index d532a8e..a1b18bf 100644 --- a/src/main/resources/statistics/test.conf +++ b/src/main/resources/statistics/test.conf @@ -1,10 +1,10 @@ -compare_with_old = true +describe_old = true -new_flat = [ +main_flat = [ { - name = "MCO" + table_name = "MCO" main_table = "MCO_C" input_path = "src/test/resources/statistics/flat_table/input/newMCO" - output_stat_path = "target/test/output/statistics/newMCO" + output_stat_path = "target/test/output/statistics/MCO" } ] \ No newline at end of file diff --git a/src/main/scala/fr/polytechnique/cmap/cnam/flattening/FlatteningConfig.scala b/src/main/scala/fr/polytechnique/cmap/cnam/flattening/FlatteningConfig.scala index 38f44e9..9a9ce14 100644 --- a/src/main/scala/fr/polytechnique/cmap/cnam/flattening/FlatteningConfig.scala +++ b/src/main/scala/fr/polytechnique/cmap/cnam/flattening/FlatteningConfig.scala @@ -24,19 +24,19 @@ object FlatteningConfig { newConfig.withFallback(defaultConfig).resolve() } - val schemaFilePath: List[String] = conf.getStringList("schema_file_path").asScala.toList + lazy val schemaFilePath: List[String] = conf.getStringList("schema_file_path").asScala.toList - val outputBasePath: String = conf.getString("single_table_path") + lazy val outputBasePath: String = conf.getString("single_table_path") - val tablesConfigList: List[Config] = conf.getConfigList("tables_config").asScala.toList + lazy val tablesConfigList: List[Config] = conf.getConfigList("tables_config").asScala.toList .map(_.getConfigList("tables").asScala.toList) .reduce(_ ::: _) - val partitionsList: List[ConfigPartition] = getPartitionList(tablesConfigList) - val joinTablesConfig: List[Config] = conf.getConfigList("join").asScala.toList + lazy val partitionsList: List[ConfigPartition] = getPartitionList(tablesConfigList) + lazy val joinTablesConfig: List[Config] = conf.getConfigList("join").asScala.toList - private val csvSchema = CSVSchemaReader.readSchemaFiles(schemaFilePath) - val columnTypes: Map[String, List[(String, String)]] = CSVSchemaReader.readColumnsType(csvSchema) + private lazy val csvSchema = CSVSchemaReader.readSchemaFiles(schemaFilePath) + lazy val columnTypes: Map[String, List[(String, String)]] = CSVSchemaReader.readColumnsType(csvSchema) implicit class SingleTableConfig(config: Config) { diff --git a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/CustomStatistics.scala b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/CustomStatistics.scala index f564d0f..3f47a16 100644 --- a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/CustomStatistics.scala +++ b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/CustomStatistics.scala @@ -22,9 +22,9 @@ object CustomStatistics { max(numericColumn).cast("string").as("Max"), count(numericColumn).cast("long").as("Count"), countDistinct(numericColumn).cast("long").as("CountDistinct"), - sum(numericColumn).cast("double").as("Sum"), - sumDistinct(numericColumn).cast("double").as("SumDistinct"), - avg(numericColumn).cast("double").as("Avg") + round(sum(numericColumn).cast("double"), 4).as("Sum"), + round(sumDistinct(numericColumn).cast("double"), 4).as("SumDistinct"), + round(avg(numericColumn).cast("double"), 4).as("Avg") ).withColumn("ColName", lit(numericColumn)) // For other types, compute only min, max and counts diff --git a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/FlatTableHelper.scala b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/DataFrameHelper.scala similarity index 89% rename from src/main/scala/fr/polytechnique/cmap/cnam/statistics/FlatTableHelper.scala rename to src/main/scala/fr/polytechnique/cmap/cnam/statistics/DataFrameHelper.scala index 57a231f..81dda2d 100644 --- a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/FlatTableHelper.scala +++ b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/DataFrameHelper.scala @@ -5,7 +5,7 @@ import org.apache.spark.sql.functions.col import fr.polytechnique.cmap.cnam.statistics import fr.polytechnique.cmap.cnam.utilities.DFUtils -object FlatTableHelper { +object DataFrameHelper { implicit class ImplicitDF(data: DataFrame) { @@ -70,6 +70,12 @@ object FlatTableHelper { .write .parquet(outputPath) } + + def prefixColumnNames(prefix: String, separator: String = "__"): DataFrame = { + data.columns.foldLeft(data) { + (curDF, colName) => curDF.withColumnRenamed(colName, prefix + separator + colName) + } + } } } diff --git a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/FlatTableConfig.scala b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/FlatTableConfig.scala new file mode 100644 index 0000000..987bddd --- /dev/null +++ b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/FlatTableConfig.scala @@ -0,0 +1,42 @@ +package fr.polytechnique.cmap.cnam.statistics + +import scala.collection.JavaConverters._ +import com.typesafe.config.Config + +case class FlatTableConfig( + tableName: String, + centralTable: String, + dateFormat: String, + inputPath: String, + outputStatPath: String, + singleTables: List[SingleTableConfig]) { + + override def toString: String = { + s"tableName -> $tableName \n" + + s"centralTable -> $centralTable \n" + + s"dateFormat -> $dateFormat \n" + + s"inputPath -> $inputPath \n" + + s"outputStatPath -> $outputStatPath \n" + + s"singleTableCount -> ${singleTables.size}" + } +} + +object FlatTableConfig { + + def fromConfig(c: Config): FlatTableConfig = { + + val singleTables = if (c.hasPath("single_tables")) + c.getConfigList("single_tables").asScala.toList.map(SingleTableConfig.fromConfig) + else + List[SingleTableConfig]() + + FlatTableConfig( + tableName = c.getString("table_name"), + centralTable = c.getString("main_table"), + dateFormat = if (c.hasPath("date_format")) c.getString("date_format") else "dd/MM/yyyy", + inputPath = c.getString("input_path"), + outputStatPath = c.getString("output_stat_path"), + singleTables = singleTables + ) + } +} diff --git a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/SingleTableConfig.scala b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/SingleTableConfig.scala new file mode 100644 index 0000000..4a1a2f1 --- /dev/null +++ b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/SingleTableConfig.scala @@ -0,0 +1,27 @@ +package fr.polytechnique.cmap.cnam.statistics + +import com.typesafe.config.Config + +case class SingleTableConfig( + tableName: String, + dateFormat: String, + inputPath: String) { + + override def toString: String = { + s"tableName -> $tableName \n" + + s"dateFormat -> $dateFormat \n" + + s"inputPath -> $inputPath" + } +} + +object SingleTableConfig { + + def fromConfig(c: Config): SingleTableConfig = { + SingleTableConfig( + tableName = c.getString("table_name"), + dateFormat = if (c.hasPath("date_format")) c.getString("date_format") else "dd/MM/yyyy", + inputPath = c.getString("input_path") + ) + } +} + diff --git a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/SingleTablesStatistics.scala b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/SingleTablesStatistics.scala deleted file mode 100644 index 6966f30..0000000 --- a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/SingleTablesStatistics.scala +++ /dev/null @@ -1,32 +0,0 @@ -package fr.polytechnique.cmap.cnam.statistics - -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.functions._ -import fr.polytechnique.cmap.cnam.statistics.CustomStatistics._ - -object SingleTablesStatistics { - - // Todo: This method works only when there is no unseen keys (key colNames) in the individual DFs - // that don't exists in the DF that contains the key (eq, PRS for DCIR, C for PMSI MCO ). - // We should waive this restriction in the future versions. - - def describeSingleTables(singleTables: List[DataFrame]): DataFrame = { - - // Selects all the data of a column from all tables containing this column - def selectColumnFromTables(colName: String): DataFrame = { - singleTables - .collect { - case table if table.columns.contains(colName) => table.select(col(colName)) - } - .reduce(_.union(_)) - } - - val columnsToDescribe: Set[String] = singleTables.map(_.columns).reduce(_ ++ _).toSet - - val describedColumns: List[DataFrame] = columnsToDescribe.map { - colName => selectColumnFromTables(colName).customDescribe(distinctOnly = true) - }.toList - - describedColumns.reduce(_.union(_)) - } -} \ No newline at end of file diff --git a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsConfig.scala b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsConfig.scala index 143d6d6..54a3b8a 100644 --- a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsConfig.scala +++ b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsConfig.scala @@ -17,53 +17,21 @@ object StatisticsConfig { newConfig.withFallback(defaultConfig).resolve() } - val compareWithOldFlattening: Boolean = conf.getBoolean("compare_with_old") + val describeOldFlatTable: Boolean = conf.getBoolean("describe_old") - val oldFlatConfig: List[Config] = { + val oldFlatConfig: List[FlatTableConfig] = { if(conf.hasPath("old_flat")) { - conf.getConfigList("old_flat").asScala.toList + conf.getConfigList("old_flat").asScala.toList.map(FlatTableConfig.fromConfig) } else { - List[Config]() + List[FlatTableConfig]() } } - val mainFlatConfig: List[Config] = { - if(conf.hasPath("new_flat")) { - conf.getConfigList("new_flat").asScala.toList - } else { - List[Config]() - } - } - - val singleTablesConfig: List[Config] = { - if(conf.hasPath("single_tables")) { - conf.getConfigList("single_tables").asScala.toList - } else { - List[Config]() - } - } - - implicit class StatConfig(statConf: Config) { - - val flatTableName: String = statConf.getString("name") - val mainTableName: String = statConf.getString("main_table") - val dateFormat: String = { - if(statConf.hasPath("date_format")) { - statConf.getString("date_format") - } else { - "dd/MM/yyyy" - } - } - - val inputPath: String = statConf.getString("input_path") - val statOutputPath: String = statConf.getString("output_stat_path") - - def prettyPrint: String = { - s"flatTableName -> $flatTableName \n" + - s"mainTableName -> $mainTableName \n" + - s"dateFormat -> $dateFormat \n" + - s"inputPath -> $inputPath \n" + - s"statOutputPath -> $statOutputPath" + val mainFlatConfig: List[FlatTableConfig] = { + if(conf.hasPath("main_flat")) { + conf.getConfigList("main_flat").asScala.toList.map(FlatTableConfig.fromConfig) + } else { + List[FlatTableConfig]() } } } diff --git a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMain.scala b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMain.scala index 463d150..f6f869f 100644 --- a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMain.scala +++ b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMain.scala @@ -1,9 +1,10 @@ package fr.polytechnique.cmap.cnam.statistics -import org.apache.spark.sql.{Dataset, SQLContext} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.{DataFrame, Dataset, SQLContext} +import fr.polytechnique.cmap.cnam.Main import fr.polytechnique.cmap.cnam.flattening.FlatteningConfig import fr.polytechnique.cmap.cnam.utilities.DFUtils.readParquet -import fr.polytechnique.cmap.cnam.{Main, utilities} case class TableSchema(tableName: String, columnTypes: Map[String, String]) @@ -11,63 +12,79 @@ object StatisticsMain extends Main { override def appName = "Statistics" - def run(sqlContext: SQLContext, argsMap: Map[String, String]): Option[Dataset[_]] = { + def computeSingleTableStats( + singleTableData: DataFrame, + centralTableName: String, + singleConf: SingleTableConfig): DataFrame = { - def describeOldFlatTable(): Unit = { + logger.info(s"Computing Statistics on the single table: ${singleConf.tableName}") + println(singleConf) - import FlatTableHelper.ImplicitDF - import StatisticsConfig.StatConfig + import CustomStatistics.Statistics + import DataFrameHelper._ - val tablesSchema: List[TableSchema] = FlatteningConfig.columnTypes.map(x => - TableSchema(x._1, x._2.toMap)).toList + val prefixedData = if(singleConf.tableName != centralTableName) + singleTableData.prefixColumnNames(singleConf.tableName, "__") + else + singleTableData - StatisticsConfig.oldFlatConfig.foreach { conf => - logger.info(s"Computing Statistics on the old Flat: ${conf.flatTableName}") - println(conf.prettyPrint) - - readParquet(sqlContext, conf.inputPath) - .drop("key") - .changeColumnNameDelimiter - .changeSchema(tablesSchema, conf.mainTableName, conf.dateFormat) - } - } + prefixedData.customDescribe(distinctOnly = true) + } - def describeSingleTables(): Unit = { + def describeFlatTable(data: DataFrame, flatConf: FlatTableConfig): Unit = { - import StatisticsConfig.StatConfig + import CustomStatistics.Statistics - StatisticsConfig.singleTablesConfig.foreach { conf => - logger.info(s"Computing Statistics on the single table: ${conf.flatTableName}") - println(conf.prettyPrint) + logger.info(s"Computing Statistics on the flat table: ${flatConf.tableName}") + println(flatConf) - import CustomStatistics.Statistics - readParquet(sqlContext, conf.inputPath) - .customDescribe(distinctOnly = true) - .write.parquet(conf.statOutputPath) - } - } + val flatTableStats = data.drop("year").customDescribe().persist() - def describeMainFlatTable(): Unit = { + flatTableStats.write.parquet(flatConf.outputStatPath + "/flat_table") - import FlatTableHelper.ImplicitDF - import StatisticsConfig.StatConfig + if(flatConf.singleTables.nonEmpty) { + val singleTablesStats = flatConf.singleTables.map { singleTableConf => + val singleTableData = readParquet(data.sqlContext, singleTableConf.inputPath) + computeSingleTableStats(singleTableData, flatConf.centralTable, singleTableConf) + }.reduce(_.union(_)).persist() - StatisticsConfig.mainFlatConfig.foreach { conf => - logger.info(s"Computing Statistics on the main Flat: ${conf.flatTableName}") - println(conf.prettyPrint) + val diff = flatTableStats + .select(singleTablesStats.columns.map(col): _*) + .except(singleTablesStats) - readParquet(sqlContext, conf.inputPath) - .drop("year") - .writeStatistics(conf.statOutputPath) - } + singleTablesStats.write.parquet(flatConf.outputStatPath + "/single_tables") + diff.write.parquet(flatConf.outputStatPath + "/diff") } + } + + override def run(sqlContext: SQLContext, argsMap: Map[String, String]): Option[Dataset[_]] = { argsMap.get("conf").foreach(sqlContext.setConf("conf", _)) argsMap.get("env").foreach(sqlContext.setConf("env", _)) - if(StatisticsConfig.compareWithOldFlattening) describeOldFlatTable() - describeSingleTables() - describeMainFlatTable() + import DataFrameHelper.ImplicitDF + // Compute and save stats for old flattening + if(StatisticsConfig.describeOldFlatTable) { + + StatisticsConfig.oldFlatConfig.foreach { conf => + + val tablesSchema: List[TableSchema] = FlatteningConfig.columnTypes.map(x => + TableSchema(x._1, x._2.toMap)).toList + + val oldFlatData = readParquet(sqlContext, conf.inputPath) + .drop("key") + .changeColumnNameDelimiter + .changeSchema(tablesSchema, conf.centralTable, conf.dateFormat) + + describeFlatTable(oldFlatData, conf) + } + } + + // Compute and save stats for main flattening + StatisticsConfig.mainFlatConfig.foreach { conf => + val flatData = readParquet(sqlContext, conf.inputPath) + describeFlatTable(flatData, conf) + } None } diff --git a/src/test/resources/statistics/flat_table/expected/newMCOStat/part-00000-5ab4b7e5-3b53-4e16-bc0b-b2ba54555cc6.snappy.parquet b/src/test/resources/statistics/flat_table/expected/newMCOStat/part-00000-5ab4b7e5-3b53-4e16-bc0b-b2ba54555cc6.snappy.parquet deleted file mode 100644 index dcf1933..0000000 Binary files a/src/test/resources/statistics/flat_table/expected/newMCOStat/part-00000-5ab4b7e5-3b53-4e16-bc0b-b2ba54555cc6.snappy.parquet and /dev/null differ diff --git a/src/test/resources/statistics/flat_table/expected/newMCOStat/part-00000-87518957-cfd3-414e-aed7-0da7da831ce5.snappy.parquet b/src/test/resources/statistics/flat_table/expected/newMCOStat/part-00000-87518957-cfd3-414e-aed7-0da7da831ce5.snappy.parquet new file mode 100644 index 0000000..7b76b3d Binary files /dev/null and b/src/test/resources/statistics/flat_table/expected/newMCOStat/part-00000-87518957-cfd3-414e-aed7-0da7da831ce5.snappy.parquet differ diff --git a/src/test/resources/statistics/single-tables/CENTRAL/part-00000-7e623c7a-610a-411c-980e-57b5b19662da.snappy.parquet b/src/test/resources/statistics/single-tables/CENTRAL/part-00000-7e623c7a-610a-411c-980e-57b5b19662da.snappy.parquet new file mode 100644 index 0000000..424b659 Binary files /dev/null and b/src/test/resources/statistics/single-tables/CENTRAL/part-00000-7e623c7a-610a-411c-980e-57b5b19662da.snappy.parquet differ diff --git a/src/test/resources/statistics/single-tables/OTHER/part-00000-9da54d7c-7667-493c-acc3-913e49669ca2.snappy.parquet b/src/test/resources/statistics/single-tables/OTHER/part-00000-9da54d7c-7667-493c-acc3-913e49669ca2.snappy.parquet new file mode 100644 index 0000000..d83c3dd Binary files /dev/null and b/src/test/resources/statistics/single-tables/OTHER/part-00000-9da54d7c-7667-493c-acc3-913e49669ca2.snappy.parquet differ diff --git a/src/test/scala/fr/polytechnique/cmap/cnam/flattening/FlatteningConfigSuite.scala b/src/test/scala/fr/polytechnique/cmap/cnam/flattening/FlatteningConfigSuite.scala index ff9d4ca..b826297 100644 --- a/src/test/scala/fr/polytechnique/cmap/cnam/flattening/FlatteningConfigSuite.scala +++ b/src/test/scala/fr/polytechnique/cmap/cnam/flattening/FlatteningConfigSuite.scala @@ -64,7 +64,8 @@ class FlatteningConfigSuite extends SharedContext { // Given val tableConfig = ConfigFactory.parseString( """ - { name = IR_BEN_R + { + name = IR_BEN_R partition_strategy = "none" partitions = [{path = [/shared/Observapur/raw_data/IR_BEN_R.CSV]}] } diff --git a/src/test/scala/fr/polytechnique/cmap/cnam/statistics/FlatTableHelperSuite.scala b/src/test/scala/fr/polytechnique/cmap/cnam/statistics/DataFrameHelperSuite.scala similarity index 91% rename from src/test/scala/fr/polytechnique/cmap/cnam/statistics/FlatTableHelperSuite.scala rename to src/test/scala/fr/polytechnique/cmap/cnam/statistics/DataFrameHelperSuite.scala index d555ecc..851830c 100644 --- a/src/test/scala/fr/polytechnique/cmap/cnam/statistics/FlatTableHelperSuite.scala +++ b/src/test/scala/fr/polytechnique/cmap/cnam/statistics/DataFrameHelperSuite.scala @@ -4,9 +4,9 @@ import java.sql.Date import org.apache.spark.sql.types.TimestampType import fr.polytechnique.cmap.cnam.{SharedContext, utilities} -class FlatTableHelperSuite extends SharedContext { +class DataFrameHelperSuite extends SharedContext { - "changeColumnNameDelimiter" should "change the column name delimiters from dot to underscore " in { + "changeColumnNameDelimiter" should "change the column tableName delimiters from dot to underscore " in { // Given val sqlCtx = sqlContext @@ -21,7 +21,7 @@ class FlatTableHelperSuite extends SharedContext { ).toDF("key1", "noDelimiter", "key__delimiter1", "key__delimiter2") // When - import FlatTableHelper.ImplicitDF + import DataFrameHelper.ImplicitDF val result = inputDf.changeColumnNameDelimiter // Then @@ -53,7 +53,7 @@ class FlatTableHelperSuite extends SharedContext { val dateFormat = "yyyy-MM-dd" // When - import FlatTableHelper.ImplicitDF + import DataFrameHelper.ImplicitDF val result = inputDf.changeSchema(inputSchema, mainTableName, dateFormat) // Then @@ -88,7 +88,7 @@ class FlatTableHelperSuite extends SharedContext { val mainTableName = "ER_PRS_F" // When - import FlatTableHelper.ImplicitDF + import DataFrameHelper.ImplicitDF val result = inputDf.changeSchema(schemaMap, mainTableName) // Then @@ -103,7 +103,7 @@ class FlatTableHelperSuite extends SharedContext { assert(inputDf.schema != result.schema) } - "annotateJoiningTablesColumns" should "prefix table name to the column names of the joining " + + "annotateJoiningTablesColumns" should "prefix table tableName to the column names of the joining " + "tables" in { // Given @@ -130,7 +130,7 @@ class FlatTableHelperSuite extends SharedContext { val sampleDf = Seq("dummyValue").toDF("Col_1") // When - import FlatTableHelper.ImplicitDF + import DataFrameHelper.ImplicitDF val testResult = testInput.map { testInput => sampleDf.annotateJoiningTablesColumns(testInput, mainTableName) } @@ -152,7 +152,7 @@ class FlatTableHelperSuite extends SharedContext { val sampleDf = Seq("dummyValue").toDF("Col_1") // When - import FlatTableHelper.ImplicitDF + import DataFrameHelper.ImplicitDF val result = sampleDf.prefixColName(tableName, columnName) // Then @@ -169,7 +169,7 @@ class FlatTableHelperSuite extends SharedContext { val expectedResult = sqlContext.read.parquet(expectedResultPath) // When - import FlatTableHelper.ImplicitDF + import DataFrameHelper.ImplicitDF inputDf.writeStatistics(resultPath) val result = sqlContext.read.parquet(resultPath) diff --git a/src/test/scala/fr/polytechnique/cmap/cnam/statistics/FlatTableConfigSuite.scala b/src/test/scala/fr/polytechnique/cmap/cnam/statistics/FlatTableConfigSuite.scala new file mode 100644 index 0000000..7094587 --- /dev/null +++ b/src/test/scala/fr/polytechnique/cmap/cnam/statistics/FlatTableConfigSuite.scala @@ -0,0 +1,73 @@ +package fr.polytechnique.cmap.cnam.statistics + +import com.typesafe.config.ConfigFactory +import fr.polytechnique.cmap.cnam.SharedContext + +class FlatTableConfigSuite extends SharedContext { + + val tableName = "A_TABLE" + val centralTable = "CentralTable" + val dateFormat = "yyyy-MM-dd" + val inputPath = "/path/to/input/" + val outputStatPath = "/path/to/output/" + val singleTables = List( + SingleTableConfig("A_SINGLE_TABLE", "yyyy-MM-dd", "/path/to/input/single_table") + ) + + "toString" should "print the class members" in { + + // Given + val expected = { + s"tableName -> $tableName \n" + + s"centralTable -> $centralTable \n" + + s"dateFormat -> $dateFormat \n" + + s"inputPath -> $inputPath \n" + + s"outputStatPath -> $outputStatPath \n" + + s"singleTableCount -> ${singleTables.size}" + } + val input = FlatTableConfig( + tableName, + centralTable, + dateFormat, + inputPath, + outputStatPath, + singleTables + ) + + // When + val result = input.toString + + // Then + assert(result == expected) + } + + "fromConfig" should "create a FlatTableConfig from a com.typesafe.config.Config instance" in { + + // Given + val config = ConfigFactory.parseString( + s""" + { + table_name = $tableName + main_table = $centralTable + date_format = $dateFormat + input_path = $inputPath + output_stat_path = $outputStatPath + } + """.stripMargin) + + val expected = FlatTableConfig( + tableName, + centralTable, + dateFormat, + inputPath, + outputStatPath, + List() + ) + + // When + val result = FlatTableConfig.fromConfig(config) + + // Then + assert(result == expected) + } +} diff --git a/src/test/scala/fr/polytechnique/cmap/cnam/statistics/SingleTableConfigSuite.scala b/src/test/scala/fr/polytechnique/cmap/cnam/statistics/SingleTableConfigSuite.scala new file mode 100644 index 0000000..2b5a6d0 --- /dev/null +++ b/src/test/scala/fr/polytechnique/cmap/cnam/statistics/SingleTableConfigSuite.scala @@ -0,0 +1,48 @@ +package fr.polytechnique.cmap.cnam.statistics + +import com.typesafe.config.ConfigFactory +import fr.polytechnique.cmap.cnam.SharedContext + +class SingleTableConfigSuite extends SharedContext { + + val tableName = "A_TABLE" + val dateFormat = "yyyy-MM-dd" + val inputPath = "/path/to/parquet/" + + "toString" should "print the class members" in { + + // Given + val expected = { + s"tableName -> $tableName \n" + + s"dateFormat -> $dateFormat \n" + + s"inputPath -> $inputPath" + } + + // When + val result = SingleTableConfig(tableName, dateFormat, inputPath).toString + + // Then + assert(result == expected) + } + + "fromConfig" should "create a SingleTableConfig from a com.typesafe.config.Config instance" in { + + // Given + val config = ConfigFactory.parseString( + s""" + { + table_name = $tableName + date_format = $dateFormat + input_path = $inputPath + } + """.stripMargin) + + val expected = SingleTableConfig(tableName, dateFormat, inputPath) + + // When + val result = SingleTableConfig.fromConfig(config) + + // Then + assert(result == expected) + } +} diff --git a/src/test/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsConfigSuite.scala b/src/test/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsConfigSuite.scala deleted file mode 100644 index c9f5699..0000000 --- a/src/test/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsConfigSuite.scala +++ /dev/null @@ -1,25 +0,0 @@ -package fr.polytechnique.cmap.cnam.statistics - -import fr.polytechnique.cmap.cnam.SharedContext - -class StatisticsConfigSuite extends SharedContext { - - "toString" should "return all the values from the test config file correctly" in { - - // Given - val expectedResult = "flatTableName -> MCO \n" + - "mainTableName -> MCO_C \n" + - "dateFormat -> dd/MM/yyyy \n" + - "inputPath -> src/test/resources/statistics/flat_table/input/newMCO \n" + - "statOutputPath -> target/test/output/statistics/newMCO" - - // When - import fr.polytechnique.cmap.cnam.statistics.StatisticsConfig._ - val result = StatisticsConfig.mainFlatConfig.head.prettyPrint - - println(result) - println(expectedResult) - assert(result === expectedResult) - } - -} diff --git a/src/test/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMainSuite.scala b/src/test/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMainSuite.scala index ddce0bc..3de9d8f 100644 --- a/src/test/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMainSuite.scala +++ b/src/test/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMainSuite.scala @@ -1,15 +1,91 @@ package fr.polytechnique.cmap.cnam.statistics +import org.apache.spark.sql.Row import fr.polytechnique.cmap.cnam.{SharedContext, utilities} class StatisticsMainSuite extends SharedContext{ + "computeSingleTableStats" should "return the statistics for a given single table" in { + + val sqlCtx = sqlContext + import sqlCtx.implicits._ + + // Given + val centralTableName = "CENTRAL" + val singleConf = SingleTableConfig("SINGLE", "yyyy-MM-dd", "/path/to/something/") + val input = Seq( + ("1", 10, "1"), ("2", 20, "10"), ("2", 20, "10") + ).toDF("NUM_ENQ", "NUMERIC_COL", "STRING_COL") + val expected = Seq( + ("1", "2", 2L, None, "SINGLE__NUM_ENQ"), + ("10", "20", 2L, Some(30D), "SINGLE__NUMERIC_COL"), + ("1", "10", 2L, None, "SINGLE__STRING_COL") + ).toDF("Min", "Max", "CountDistinct", "SumDistinct", "ColName") + + // When + val result = StatisticsMain.computeSingleTableStats(input, centralTableName, singleConf) + + // Then + import utilities.DFUtils.CSVDataFrame + assert(result sameAs expected) + } + + "describeFlatTable" should "write the statistics for a given flat table" in { + + val sqlCtx = sqlContext + import sqlCtx.implicits._ + + // Given + val flatConf = FlatTableConfig( + tableName = "FLAT", + centralTable = "CENTRAL", + dateFormat = "yyyy-MM-dd", + inputPath = "/path/to/something/", + outputStatPath = "target/test/output/stats", + singleTables = List( + SingleTableConfig("CENTRAL", "yyyy-MM-dd", "src/test/resources/statistics/single-tables/CENTRAL"), + SingleTableConfig("OTHER", "yyyy-MM-dd", "src/test/resources/statistics/single-tables/OTHER") + ) + ) + val input = Seq( + ("1", 10, "1"), + ("2", 20, "10"), + ("2", 20, "10") + ).toDF("NUM_ENQ", "NUMERIC_COL", "OTHER__STRING_COL") + val expectedFlat = Seq( + ("1", "2", 3L, 2L, None, None, None, "NUM_ENQ"), + ("10", "20", 3L, 2L, Some(50D), Some(30D), Some(16.6667D), "NUMERIC_COL"), + ("1", "10", 3L, 2L, None, None, None, "OTHER__STRING_COL") + ).toDF("Min", "Max", "Count", "CountDistinct", "Sum", "SumDistinct", "Avg", "ColName") + val expectedSingle = Seq( + ("1", "2", 2L, None, "NUM_ENQ"), + ("10", "20", 2L, Some(30D), "NUMERIC_COL"), + ("1", "10", 2L, None, "OTHER__STRING_COL"), + ("1", "2", 2L, None, "OTHER__NUM_ENQ"), + ("10", "20", 2L, Some(30D), "OTHER__NUMERIC_COL") + ).toDF("Min", "Max", "CountDistinct", "SumDistinct", "ColName") + val expectedDiff = sqlContext.createDataFrame(sc.parallelize(List[Row]()), expectedSingle.schema) + + + // When + StatisticsMain.describeFlatTable(input, flatConf) + val resultFlat = spark.read.parquet("target/test/output/stats/flat_table") + val resultSingle = spark.read.parquet("target/test/output/stats/single_tables") + val diff = spark.read.parquet("target/test/output/stats/diff") + + // Then + import utilities.DFUtils.CSVDataFrame + assert(resultFlat sameAs expectedFlat) + assert(resultSingle sameAs expectedSingle) + assert(diff sameAs expectedDiff) + } + "run" should "run the overall pipeline correctly without any error" in { // Given val expectedResultPath = "src/test/resources/statistics/flat_table/expected/newMCOStat" val expectedResult = sqlContext.read.parquet(expectedResultPath) - val resultPath = "target/test/output/statistics/newMCO" + val resultPath = "target/test/output/statistics/MCO/flat_table" // When StatisticsMain.run(sqlContext, Map())