From 9a291081946c2019002f373fff88f4ff72484d17 Mon Sep 17 00:00:00 2001 From: sathiya Date: Wed, 10 May 2017 14:48:34 +0200 Subject: [PATCH 1/2] CNAM-181 Initial Working Version --- src/main/resources/statistics/cmap.conf | 31 +++ src/main/resources/statistics/main.conf | 7 + src/main/resources/statistics/test.conf | 8 + .../cnam/statistics/CustomStatistics.scala | 55 ++++++ .../cnam/statistics/StatisticsConfig.scala | 50 +++++ .../cmap/cnam/statistics/StatisticsMain.scala | 102 ++++++++++ .../statistics/custom-statistics/IR_BEN_R.csv | 3 + ...2fc7-4e80-8a2c-d27f62e774a3.snappy.parquet | Bin 0 -> 3440 bytes ...c14a-4f21-87a9-6b920d87bb55.snappy.parquet | Bin 0 -> 18886 bytes ...5b44-41be-a014-16f8c5b884a7.snappy.parquet | Bin 0 -> 19297 bytes ...032a-40cc-929e-13a34fe590dc.snappy.parquet | Bin 0 -> 19349 bytes .../statistics/CustomStatisticsSuite.scala | 122 ++++++++++++ .../statistics/StatisticsConfigSuite.scala | 25 +++ .../cnam/statistics/StatisticsMainSuite.scala | 187 ++++++++++++++++++ 14 files changed, 590 insertions(+) create mode 100644 src/main/resources/statistics/cmap.conf create mode 100644 src/main/resources/statistics/main.conf create mode 100644 src/main/resources/statistics/test.conf create mode 100644 src/main/scala/fr/polytechnique/cmap/cnam/statistics/CustomStatistics.scala create mode 100644 src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsConfig.scala create mode 100644 src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMain.scala create mode 100644 src/test/resources/statistics/custom-statistics/IR_BEN_R.csv create mode 100644 src/test/resources/statistics/flat_table/expected/newMCOStat/part-00000-7d919f75-2fc7-4e80-8a2c-d27f62e774a3.snappy.parquet create mode 100644 src/test/resources/statistics/flat_table/input/newMCO/year=2006/part-00000-8944a73e-c14a-4f21-87a9-6b920d87bb55.snappy.parquet create mode 100644 src/test/resources/statistics/flat_table/input/newMCO/year=2007/part-00000-454fa049-5b44-41be-a014-16f8c5b884a7.snappy.parquet create mode 100644 src/test/resources/statistics/flat_table/input/newMCO/year=2008/part-00000-ca33fc10-032a-40cc-929e-13a34fe590dc.snappy.parquet create mode 100644 src/test/scala/fr/polytechnique/cmap/cnam/statistics/CustomStatisticsSuite.scala create mode 100644 src/test/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsConfigSuite.scala create mode 100644 src/test/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMainSuite.scala diff --git a/src/main/resources/statistics/cmap.conf b/src/main/resources/statistics/cmap.conf new file mode 100644 index 0000000..7fd60e9 --- /dev/null +++ b/src/main/resources/statistics/cmap.conf @@ -0,0 +1,31 @@ +old_flat = [ + { + name = "DCIR" + main_table = "ER_PRS_F" + date_format = "dd/MM/yyyy" + input_path = "/shared/Observapur/old_flattening/joins/DCIR" + output_stat_path = "/shared/Observapur/staging/Flattening/statistics/old_flat/DCIR" + } + { + name = "MCO" + main_table = "MCO_C" + date_format = "ddMMMyyyy" + input_path = "/shared/Observapur/old_flattening/joins/MCO" + output_stat_path = "/shared/Observapur/staging/Flattening/statistics/old_flat/MCO" + } +] + +new_flat = [ + { + name = "DCIR" + main_table = "ER_PRS_F" + input_path = "/shared/Observapur/staging/Flattening/flat_table/DCIR" + output_stat_path = "/shared/Observapur/staging/Flattening/statistics/new_flat/DCIR" + } + { + name = "MCO" + main_table = "MCO_C" + input_path = "/shared/Observapur/staging/Flattening/flat_table/MCO" + output_stat_path = "/shared/Observapur/staging/Flattening/statistics/new_flat/MCO" + } +] \ No newline at end of file diff --git a/src/main/resources/statistics/main.conf b/src/main/resources/statistics/main.conf new file mode 100644 index 0000000..baf0034 --- /dev/null +++ b/src/main/resources/statistics/main.conf @@ -0,0 +1,7 @@ +cmap = { + include "cmap.conf" +} + +test = { + include "test.conf" +} \ No newline at end of file diff --git a/src/main/resources/statistics/test.conf b/src/main/resources/statistics/test.conf new file mode 100644 index 0000000..56b6d16 --- /dev/null +++ b/src/main/resources/statistics/test.conf @@ -0,0 +1,8 @@ +new_flat = [ + { + name = "MCO" + main_table = "MCO_C" + input_path = "src/test/resources/statistics/flat_table/input/newMCO" + output_stat_path = "target/test/output/statistics/newMCO" + } +] \ No newline at end of file diff --git a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/CustomStatistics.scala b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/CustomStatistics.scala new file mode 100644 index 0000000..e8a1fa6 --- /dev/null +++ b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/CustomStatistics.scala @@ -0,0 +1,55 @@ +package fr.polytechnique.cmap.cnam.statistics + +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.{NumericType, StructType} + +object CustomStatistics { + + implicit class Statistics(val df: DataFrame) { + + def customDescribe(inputColumns: Array[String], distinctOnly: Boolean = false): DataFrame = { + + def computeAvailableAgg(schema: StructType, + colName: String): DataFrame = + colName match { + case numericColumn if schema.apply(numericColumn).dataType.isInstanceOf[NumericType] => + df.select(numericColumn) + .agg( + min(numericColumn) cast "string" as "Min", + max(numericColumn) cast "string" as "Max", + count(numericColumn) cast "long" as "Count", + countDistinct(numericColumn) cast "long" as "CountDistinct", + sum(numericColumn) cast "string" as "Sum", + sumDistinct(numericColumn) cast "string" as "SumDistinct", + avg(numericColumn) cast "string" as "Avg" + ).withColumn("ColName", lit(numericColumn)) + + case _ => + df.select(colName) + .agg( + min(colName) cast "string" as "Min", + max(colName) cast "string" as "Max", + count(colName) cast "long" as "Count", + countDistinct(colName) cast "long" as "CountDistinct" + ).withColumn("Sum", lit("NA")) + .withColumn("SumDistinct", lit("NA")) + .withColumn("Avg", lit("NA")) + .withColumn("ColName", lit(colName)) + } + + val outputDF: DataFrame = inputColumns + .map(computeAvailableAgg(df.schema, _)) + .reduce(_.union(_)) + + if (distinctOnly) { + outputDF + .drop("Count") + .drop("Sum") + .drop("Avg") + } + else + outputDF + } + } +} diff --git a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsConfig.scala b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsConfig.scala new file mode 100644 index 0000000..aed6469 --- /dev/null +++ b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsConfig.scala @@ -0,0 +1,50 @@ +package fr.polytechnique.cmap.cnam.statistics + +import scala.collection.JavaConverters._ +import org.apache.spark.sql.SparkSession +import com.typesafe.config.{Config, ConfigFactory} + +object StatisticsConfig { + + private lazy val conf: Config = { + val sqlContext = SparkSession.builder().getOrCreate().sqlContext + val configPath: String = sqlContext.getConf("conf", "") + val environment: String = sqlContext.getConf("env", "test") + + val defaultConfig = ConfigFactory.parseResources("statistics/main.conf").resolve().getConfig(environment) + val newConfig = ConfigFactory.parseFile(new java.io.File(configPath)).resolve() + + newConfig.withFallback(defaultConfig).resolve() + } + + val oldFlatConfig: List[Config] = if(conf.hasPath("old_flat")) + conf.getConfigList("old_flat").asScala.toList + else + List[Config]() + + val newFlatConfig: List[Config] = if(conf.hasPath("new_flat")) + conf.getConfigList("new_flat").asScala.toList + else + List[Config]() + + implicit class StatConfig(statConf: Config) { + + val flatTableName: String = statConf.getString("name") + val mainTableName: String = statConf.getString("main_table") + val dateFormat: String = if(statConf.hasPath("date_format")) + statConf.getString("date_format") + else + "dd/MM/yyyy" + + val inputPath: String = statConf.getString("input_path") + val statOutputPath: String = statConf.getString("output_stat_path") + + def prettyPrint: String = { + s"flatTableName -> $flatTableName \n" + + s"mainTableName -> $mainTableName \n" + + s"dateFormat -> $dateFormat \n" + + s"inputPath -> $inputPath \n" + + s"statOutputPath -> $statOutputPath" + } + } +} diff --git a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMain.scala b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMain.scala new file mode 100644 index 0000000..ffddefd --- /dev/null +++ b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMain.scala @@ -0,0 +1,102 @@ +package fr.polytechnique.cmap.cnam.statistics + +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.{DataFrame, Dataset, SQLContext} +import fr.polytechnique.cmap.cnam.flattening.FlatteningConfig +import fr.polytechnique.cmap.cnam.utilities.DFUtils +import fr.polytechnique.cmap.cnam.{Main, statistics, utilities} + +object StatisticsMain extends Main { + + override def appName = "Statistics" + + implicit class OldFlatHelper(data: DataFrame) { + + final val OldDelimiter: String = "\\." + final val NewDelimiter: String = "__" + + def changeColumnNameDelimiter: DataFrame = { + val renamedColumns = data.columns + .map( + columnName => { + val splittedColName = columnName.split(OldDelimiter) + if (splittedColName.size == 2) + col("`" + columnName + "`").as(splittedColName(0) + NewDelimiter + splittedColName(1)) + else + col(columnName) + }) + + data.select(renamedColumns: _*) + } + + def changeSchema(schema: Map[String, List[(String,String)]], + mainTableName: String, + dateFormat: String = "dd/MM/yyyy"): DataFrame = { + + val unknownColumnNameType = Map("HOS_NNE_MAM" -> "String") + val flatSchema: Map[String, String] = schema + .map(tableColumns => annotateJoiningTablesColumns(tableColumns, mainTableName)) + .reduce(_ ++ _) ++ unknownColumnNameType + + DFUtils.applySchema(data, flatSchema, dateFormat) + } + + def annotateJoiningTablesColumns(tableSchema: (String, List[(String,String)]), + mainTableName: String): Map[String, String] = { + val tableName: String = tableSchema._1 + val columnTypeList: List[(String, String)] = tableSchema._2 + + tableName match { + case `mainTableName` => columnTypeList.toMap + case _ => columnTypeList.map(x => (prefixColName(tableName, x._1), x._2)).toMap + } + } + + def prefixColName(tableName: String, columnName: String): String = { + tableName + NewDelimiter + columnName + } + + import statistics.CustomStatistics._ + def computeStatistics: DataFrame = data.customDescribe(data.columns) + + def writeStatistics(outputPath: String): Unit = { + data + .computeStatistics + .write + .parquet(outputPath) + } + } + + def run(sqlContext: SQLContext, argsMap: Map[String, String]): Option[Dataset[_]] = { + + argsMap.get("conf").foreach(sqlContext.setConf("conf", _)) + argsMap.get("env").foreach(sqlContext.setConf("env", _)) + + import utilities.DFUtils._ + val tablesSchema: Map[String, List[(String,String)]] = FlatteningConfig.columnTypes + + import StatisticsConfig.StatConfig + + StatisticsConfig.oldFlatConfig.foreach {conf => + logger.info(s"Computing Statistics on the old Flat: ${conf.flatTableName}") + println(conf.prettyPrint) + + readParquet(sqlContext, conf.inputPath) + .drop("key") + .changeColumnNameDelimiter + .changeSchema(tablesSchema, conf.mainTableName, conf.dateFormat) + .writeStatistics(conf.statOutputPath) + } + + StatisticsConfig.newFlatConfig.foreach {conf => + logger.info(s"Computing Statistics on the new Flat: ${conf.flatTableName}") + println(conf.prettyPrint) + + readParquet(sqlContext, conf.inputPath) + .drop("year") + .writeStatistics(conf.statOutputPath) + } + + None + } +} diff --git a/src/test/resources/statistics/custom-statistics/IR_BEN_R.csv b/src/test/resources/statistics/custom-statistics/IR_BEN_R.csv new file mode 100644 index 0000000..cc80ec8 --- /dev/null +++ b/src/test/resources/statistics/custom-statistics/IR_BEN_R.csv @@ -0,0 +1,3 @@ +BEN_CDI_NIR;BEN_DTE_INS;BEN_DTE_MAJ;BEN_NAI_ANN;BEN_NAI_MOI;BEN_RES_COM;BEN_RES_DPT;BEN_RNG_GEM;BEN_SEX_COD;BEN_TOP_CNS;MAX_TRT_DTD;ORG_CLE_NEW;ORG_AFF_BEN;BEN_DCD_AME;BEN_DCD_DTE;NUM_ENQ +00;;01/01/2006;1975;01;004;02A;1;2;1;;CODE1234;CODE1234;000101;;Patient_01 +00;;25/01/2006;1959;10;114;075;1;1;1;07/03/2008;CODE1234;CODE1234;200801;25/01/2008;Patient_02 diff --git a/src/test/resources/statistics/flat_table/expected/newMCOStat/part-00000-7d919f75-2fc7-4e80-8a2c-d27f62e774a3.snappy.parquet b/src/test/resources/statistics/flat_table/expected/newMCOStat/part-00000-7d919f75-2fc7-4e80-8a2c-d27f62e774a3.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..94f6a04f485b3a9ed83c0483a897e0e55e88e8cb GIT binary patch literal 3440 zcmb_fe{3699e?k9=bY@}yxr;>-C7#FJP9kKjf*+ZWrZfAubj(vI%xG0@Px8BW?|r`C&-Z=a_j`A)>!w0jvdNM+9A%OvEJB-1lt&0DGBibv5usm;$1u#u zm>Y-#0I9N^z@C({561Pz`a*Mg-Ii6{!>SCI#hHFNCC31h!UC(vE@8L}a5^+aX4P007E=IqZQ;1b4mk_dluFb^BBQz|QXR0e)*h*c$JB_rmbxV6@BPv+rlF z4DeZBsb|`?b>jH!^`*}}2^ikbaFxQDmjS=|rONOHAb67{cRydC>p5sX-+^%mw*#dg({nc2IqvmR9cHlI~qx9 z9IPZYS&36|HQXP8)9fgcRB$(FoWfmvURLlJn#GDYK{e(GNb1Dzc-34&K8$^BflHn~ zZ)`FlSGYbFSfEpJ4#hFYy~@2*%q0?1nko#3_J0!k=38>%NBq{`#(y)!-%^i!eZ#f2 z*}U`a#@)C1ueOEWU%c2B`YZ3nw?XI=BwHo|m04z;$b}K(>LM69Jo_?q!?aPEX@eY? zB-Q&Fz|#!iot+gV(wuL%0PRPvI1Zq`5;Kkvzlm6JV3_*wqH!-`Sd7L?+i!gbW`Mp< z#>-?MrH{Y@&^ujSEU@z#dak=0u$M{wKVc7Fbp&}du*shI`2<2gx#alr!u#mCpKW@m z#AvQ}5TVkx*GcK;$$Xi3DZQGfgjJmenxH^cj$?6{Y~u6s}p4hi{S=f8*9w=vp0d-umne?a(n+oTEJ;#`qWn)0wEjRN5FbYH4NNZM`(hlfcVeJA zC{jD~V{u8w-EmoxY1L@yR5BrH*pFhaEp7!b~Y|7i(7Co~W5(S6-j8O=GH^fj{s>Xq?8BZda!(*v*<2af|m6IKWR7Yd$w$ z)l0<9mP{DdD;D&YG3f)eCkvxl;_j`PwqC8`S?uO3wGwuV;LJ9vN8lbPaW*3W$?hod z6TL=t%FgTDRh%gU-+lm8ay47GZ9O*wKh=-eYJJp?_*^~*7dBtx{Ky|HdYXcrH_D-B zfhvzjLlLo>HEmiK~*?b9{ zbllhSAwRDdjKLaZo2*TT{9@L~0(cUF@D}ZAcG7p9TIKYTZB5tlZ6M9n@^-0UQ4`Q0 zO@#e?-JA(s?KP$hyHFg7Aad|qGPo@i zE4CU>%Gxn(O$MQ^o`o*S1w$`;j!Up}Glv^g0n<!LL%_t?VMiH_A!cI+uqQiCFvdbknHbeT5;p^qv=x4E5ua#^_kV8KmopS(vgVjF_ z4fle0_S8{n&Vj5;-;{=t7g7YwSO@keZZIyt+tu7}t=v3NiW*=hJIPQ}om^F?_-)DOB zkVKXx{hDF(wsIA9dp=;NE0+%Pb-g^+)7G&EiI5jG=Lr*htthOWZmfR6UOV|*bE!eR zhzKK$=%_3#kHAx>Gj5!rV|E0ITyAA!dA&3Cw8!~{we^MNlQ7rDS{q9b?A$PZu=-%m zp}%mdGe%-pZe_9BSZcz3;06x%2fr5P`-u?LoID$BM?e@@2>g``T<5}kX4l6=_ziA1 zvT>$^{I)Z2i-Wa2&`ozgVS^*oXD9j&_8Jqohr^rh%qPB~!uN!SmV5Ww@$gF9!#&{< z9zNQ?D@>51V|bYfY%}4H_GiRhZ-ek<-wt-SbRC;5CUB7f%wC)Ao*f%1Z3p`cCh!gu z-tOsOA3jUNw+Tg$AU^m|`;NhlCjx&KU^yc`;&BE+Bej!9 zT6=y`f-HJ|qq#1vuDmEMHP;*SjrE53Dtm5d{fnoYL!(1$>#G|l*N37*t%c^|{Myjy zGv|hu;rL+*k`3l}a0|<);C^{yaj`MC2mr8bMu#3a9U47%{(N-DxN+w3joT@_W4^fZ zZ_KU6;@T%u?i4vSSuVwYn_?SbXpvLrZcBMT-uH^g@D(4=T7NV@s^6E zc(3^LKnx*<7}E5Ckj4-~3^BwILW~cD5FZKQ!Q2o-j1R>4f!tA(`H+S+cA zh-l4PYlq&-d*rt$$cI`x8sM|BG1LfW=6VoC5Dwsx%@ z;8&qUEY{~(z6iGJTme@H2ocEEZQjY{>L#!RYGTHj}bI!xLh~b1DPTbu9 z2ORbbAJD@GoIus?I8O&If~`9FbfW+FL2IS9hWSa0-Uyl{;%SGjc<%=7 zX=QyI(kZ7jPX-NgplupIsNxdl%&&8-S=&w^)6M;b|ej(iu zrc}#60qu@yM6gvSpZ7QsTex)f>KLvoJ`YtrpvgrKY}GLggzS3wL)Tpn=VtiLo4Q+z zDa#Vc^`+xFe$uTSourF`6T(LJFS4)G)+@k9GuE%;M-%}M#R#kcoetVS`WdZ9mCnat zlmYldjHDIhDA9AVNNWR(8rq#x&grO*N%1S-n*qQHU_#UMpCh@ zPTff}(VYmo>F#m>ZN#Swe%Qy4pkG*YPdJ|x4T%KAFUPYYq}kjCS45BC@{nP;+h7BB z!9v*p=$lG7-~eb1VVBR4a?iun-6Z}3)E^anA5D56shv-Z`+bMH%Vg~F=^oE2wrz*^ z#Txq*yXuGc2%e@{#PD`UxyxMbC&3ixV?Dr!Q+hb%jAVA8GlbW;`TTz8mftfh)Ln-E ztIqXWmvTI4%Jud24CwS9av)pPPY+~xrY4=>akc}TxCfx5dVr%lvF@S9YiC3I?Z1z- z?b}t~aZDD%!;`EX>9Ry~Ui&mdCe^ZG_!QeiuvD-}&rhxoA#`{-Kcx%kS~* zsP*?~(ek@6RP1Q6leEEA!MIE5~vZ9?<-EC-YsC4-Ci0vJ72}BU!D89mVPcn6&B92Q%j0Tt`JxF%+Tp5d zu~bs+#Ro2eSxcrcD`_4qmk0Bspk~3MCTq@3tjaXX(5*W8>m%flnY-gHVDdH}EwkBpTQ$#g9B8eLK}qP?%L zPG6e!;hJ;R*g$sGhKo5I&aGM5hJpm7x0_<)$mU^pl3{i4s?v-^^$v7m;=uMsSOTld zpd>h?v$14mY2(ApkSGa_dHNe_C>c)V6eqE<(*dqfZ=S|WYXN@aGMRJ;xXyUww`#+oEtV z=u$Yk95z}rvScN{k+ZjPDomjvgt7w&fqoi}E{36-W1Uqjg!v{5`U5j*=4d|!04aiuLoN=3$@hoqF{L?}cLJ;X#WDWWaxWG198aj9FX6bWou zYHb|$7MD7|vO{ZNTY{V^3DH9I1LOQku$!%VWSm_kZ9!s=Ja-Zxdi2><7W184Ehi=* z^h`=1=qM*wQwU8p8@F8`?7T`Khr-cM!XTBYQB5)qt#OXYPPT*qO>k%pHzyOus6b*k z%!yeQ&%0t?m)3xiqK;&od)R3Tb`}}_eEdj; z)QV)m$T+A|82Y(@(K9-Mq0tZO%;*>F!f?c7CwC(Jx9F5{LC@%FIm!5}-8KRP!SR_X zb^v5Dy0fHY99t(%Pm}{FJNpwTn*G>1F#8i=aHV==EImqrVpa?Y_X9DjrAOJUu7WVG zkt)pY4MZ4y{pzD&Y*AY>PIovJV|NOn*y#>X4D?d-$HuA0B#<9HlzWE&pL)y%d}6&* zQFb>GvpTV!&FVT6#tp)4$8Ib{7&ra(^>kJ=>2F+l+{w>wGXydRkSmXaVzw%iak0}K z2+wx{+!1{pWWe_R{-#O<;cEm@^xlhGW+`BK%f#&N#CX4Q?rG8OIG59KG)m+(*%aVGzn- zQOHr)GM1mBXw?((w#W|>7@3xzf>AByE93an?l8FVkwN!H{{=D-Y?Qi{vAD@iQ*x=4 znl5faO`T;dW9b>UA<5m7pi$GMXHZkMSY<3e>kd;jU3?ZaRjO3RC*5+*`ToSm3f!m2 zSmWFa0CqRs*SDyo*J!oC-aaR>C z#A3wDJuV}jd0qnf?M1yw6EtqUGtV=x{)r*PV;uswa>EvS{`90K;8w@vo0lwDj0{%8h!T9j9&l$1;1dJB` zHpwh}Bvog$Vor>9B;_*N;l4Vfl_(fTJKV=c`-TDB*0VG@Bu4wD&t~qt z0pnxrV%l;NI@>Q{+|OC2LO{;;TOcO~E~ z+tsfxz%aH~^o&~>mtin}ZI;9^w=!&)^DjDWz!EM*7?2s1u^sTM?D;y!-~ zAo3D->}98~^0Z67Ah*@AmtDMP>6Ln5>F7(ZxPXslYl9)TL*|I_@@Uotd}L69ok<~q zJu>KmJ)EOhxBU!>Be4>Pb9{#>0Peahp(KRA;lj5SVIGjxM3X?==7_KIQXd-CTdU(ddGW_zI2_s;(6 zLeKo$|LoaQU8qb~7ApF|zW2Kp-rHO4+T68oVSfLvg|63v2 z#p#^@E9w%xdv2lnMim==V@$y+h0KOZB8X%66q`5?O_BQMVR=*EJS=_cn@7p4`sQNk zR^L2$vtv0}$>r4nSWZ?Fd2|3JQR|9+VaMY@c;MjdpNFQdeeI3=h z0Q4cr`9OGrnb&Y2^TN{!r1IswylxmIf=Cw(5v2NqA%fI@U_|7lx`5c|i+DkjC4waD z4UP=T3c-;eqaLl?_2N+ORF zV^tE#<5eM6HIYPK74n-vVh>T0{V+>`=mXd4fOwQp;MG3zWm>6|=S34_0c>7gJ%Soe zFq+X9ef@LsIyKaI{FAZzkAEVyHWU~esT&2xMruldv5|UHP;9go<)01ydl`HYTM~KM zma>H9nioXlZ_jncd-{9golj5g z-#t6snXF7zlGBxHdTJ^*ovcnz?V3psBzh~=cyI5(Orn2gx_2f$vukEkXlEM!=Qi+v GaQZLM?3s7~ literal 0 HcmV?d00001 diff --git a/src/test/resources/statistics/flat_table/input/newMCO/year=2007/part-00000-454fa049-5b44-41be-a014-16f8c5b884a7.snappy.parquet b/src/test/resources/statistics/flat_table/input/newMCO/year=2007/part-00000-454fa049-5b44-41be-a014-16f8c5b884a7.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..13aec714ae81a7eb2d30fb710c59102ca1cb8354 GIT binary patch literal 19297 zcmeHPO>A3L7Pb>N37wFZ&}UvsC>hf}D1=};ahw)2;MmSjn>fZ!tHS(voW_ODw56me zGyDogXcnmIV#ESfED%Dd8nLKYR4A%gR8`d#iz@0M7O3i?LKUhigu1Az&YW}q-@DJI zy!SfQXeLsf_}=q<_uO;OJ@4Lg9+a}>gb^`Jqj$<^F(RQ*XMaa16iUS5Cy^X9LdIHS zT|Y=fj5edA*X-au@~FswGZm|CmWhCwW@=@X8d;kZ+S{{>*k*l0YaSh*#YGFwvA6y14hS^P}J$>-g% zCio6pzm8t;EnMcbwuBLEHOc4CFJw}Khm&SFsRMhBJ~njM&}taER9oCzKd@Mh>kO-9 zpQ-%qtI1{AYUzN3!q82)3H!)2Xq(4-y(Y zoG`-)Cop9685`wPL4xnHwHfMKxEm9EOO6;BMYw$weqfWE?OFKYyS;ZG1%k36){cKN z+2;1s=b3~1CPoHL4}WN9xNn6=D&Lbl+~cxC zGu@T?p1JM4-*3c~?HtJsrxPZA(uYfQmTqL6BsQ|&$iCj#pa5G7@sx=lQ3TveoihVuh_}*ASgP zHelCX^$Nc}N)Ro3YTxk>mD-RhXijCS5$=q;!yV;gVyqJo=K)~zg zsh!#YM4QQ9L=bG|3|EPS?w+mCXyS)+0riOFf^GpY{BvS;-3PY~$EQK}Y?2)*Z0?qufjmLsl{wB%x0^_9n+h@qsaGi#r{2?ovoG& zv!Z`&V#fBpn4PqLpDf$Hm!>PW@0G$f+j#h0<*yWW*}u<}tJz{vj9-E8(=(IeJLq05 z6rUI0$Dm)CPv_3be05@`QmR6S5GoXBt1vO~9ZA{u7BEWW^?GfA+M{baH&e}4tJ&O6 z_$Lsexma>*&gFA%&G{Kyb2sB=+Env}DXtOcpPtEcB2}r-KvRocl zlgpP2V5D8Hs+KDi)n0tyGMKee5N0LKqt)tYeiGCy+0 znH`GWNKhDs;kG#mMhg?w;^;O?KqdpSIqI$ZO@Y@s?nLF`SZWS|EC`}o9kH93@y zr(UE>ibizy4L9javp!sNt{NN2soHoshr_uwE89?zfb`CS*f_Fz+?`}x-MgwZBT>Br zotQYVvk{iS?lLF|4yiX*3R&8CKQkmsLSwa3o~h6tZhO*$UQPR-~7LR>3o?pT_F7)FDaYU|R>WEH}>`681NedkVEIY^u+2gG?*<%q!mK}10?6HVV_Q)z#mL7nF?2%PA z*;}eCI~)nwTXxxNZIqYTh3vo`(?s?U(K+i{TLvn1vRY@IZ&$QHX^7{7`Fy)gi!uM}B! znj^9whoi^C(4A4MY3kq9c~&|po*0Rp{s?|Jy470g%#Z?`b@W~-WjhlJ(WCb=(Q}Gu zJ3E^RsdHRvsY?>JlhBr>*2ZC%x)|3fkZNI@f}AP|(PH#1>r@xm%~5l+zFZ@1L86X4 zc@iM{^p|VEe4^TB{bQ}vLqO=Mlt6YyUj!Knp_zBL(_l04opu|nD|NE=s4s!dHCXXWT$B{-f;uQli}o8R6_W8%1K+mjzw*- zBM(RzABja%0(mX^j&La4?Ceg2 z4@UoLoiQPRDD^h$RF6Y327&`L0WA@~IMtJ31va(LW}RIxjn7A9=YIl5!=GIbhL4dn z1%6V)pMH=6#k^ot1i~Fb%d-t0od(@fN=Yt zc$m(PRwS%X`t)(w&4xe@0OXTCP|#6+vp)E>I}o1r1h^~uJCFedZRI!X8iT2h#qLG~ zMxF`R0Ozc@SwF?}$zdFTO)Vz(Qw%02YHil#M|2$#0#_>nL6^V$2y=~(H%dR^%cWen z2mr1A@kUV2QH`^fHp#ViUcBg~jNs9Umo~X9_pK&)dT|r@TW=y8sd(y215F0qCgBS>HeA0;E?&0(>XB6b7LT7PhqFoONZhzNUD=I@m-^j?B-d4fMolmE zqo!)X&ARxcJ51H|;*+4MQfafk8IYdHLm@tg;0i{@LNEMg02aehce9qCatFeO7Qp4F z07xHR7_evz{U|QoM;#Ha1q6tG`%&D*Z$3=8j|g{YG2({_ml1C;AjScH;ZZN&V%Qsq zc81pLNvSX2tI7pkfUhTAz&{Q)11AOe$3YkH)uCqKK>@xxJZLBRM#JpZ)qcy%TtVSF#J1c-o~$=Dz#pP_21rwW!FArO4doP35o zXRd*;?Vt$f73cBo+A|PIM5Z6T-@5u6X$r0uOR5MnT>TB0!BK9rt~^V}Q3TCfE}G zpa}B-uI91?;s=iSY*NxY5G2PX5T8x5F)lVnkldF*T;zyj+Zv8Ra$*8;Y#SS6r6Gdm z$^>GCBaUu&@$>@8p$Xxm+Zp{ir?AYd5#jmh%htK=a3j)cf!2vB$2b600|lP2PfV$S z0!xC{k!cAN**5h^kIQ&un(248ku*QYqx4xW{ch2PTh$4r-z`$^==sTLB-GPuZhmFq zrGeVP8dMGkND*=1(5w3fpqlvV;rimh!Yh9n*k50)&DR!d=F;$+vBlR9)?-^^hZYwO z?_P}c$M)>4@0&jq+xpu#V+U&cv5VClp-;;0J@68IKX7>8zS`VA5CU|)KgNk;Ti<;9 z?S4VPa)jOqSfbE70m~kGCtxW>9|Sbd=$(KiAH5T>jHEvTxGMM|AjM)I1oYYHj{qKF z-U(R7(>no6d3q;ME9Bl;SW3@30n4F!C%~e;MDIPYSbwRG4YwInun;2Ct&#}h*e{Ar zoQLLMeeHsY1D~UWhfRYLJ&qZI-@Xtf@ z#r}9?Dmf4yNO1b*VToqR<7x3_;@LkDOG-;3FJ6R}rfl+(Kl@*k;mDw zGRH+zo;^?`hN0Q!(*ejq%-%xgT5dEw~^hv3Md6cHdhp8E_oTNZ2W$1j;NX@$ a$&o$tgL~3@cJJ97I+ce1=MenQwf+rIHn$T1 literal 0 HcmV?d00001 diff --git a/src/test/resources/statistics/flat_table/input/newMCO/year=2008/part-00000-ca33fc10-032a-40cc-929e-13a34fe590dc.snappy.parquet b/src/test/resources/statistics/flat_table/input/newMCO/year=2008/part-00000-ca33fc10-032a-40cc-929e-13a34fe590dc.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..94defa45ca929a0327ab26724534fc3739a51836 GIT binary patch literal 19349 zcmd^HUu;xW8lRaCrLq+fx#KNXtQD@Pwa%YTr^Qj3PKW-nosKhwp!;WbN?Vs*KrF>w z|ArXj!-g0`HinRmA;gDfV+bLJ7#@rdg!r&Agz#VtA%yTi3?YW_zyopj`_BJ+&Yc2t zFWF5bVVF7R`~A*$zVn@Pzw>=pOSy8widweSK4wL%XeiX$)e;JY5^?yGNT#fiwZdA} z1rkxK*=lLGTX>KB76bWEOM3)9o0>vR0LfY#5fWmM6q5C&lXeUFmhR~#kTvrZg5&WH z@g2m0VVSj@skLN97{ua+KH<0>j_Vry8h{~dt+igUa-=_PNBXHLRt{3j;%9%He5U^= z)&$>S>(|l_z79~es*#a5eq-hLrXj|`{maLNR>TODlZKSYCLHu zuuS&Z%6F4fcSn;doV3G96WD64V9GSF!sSSqjvPtEZTPEWjm?xu$5X^$E%843jX>1R z#5>4FX5uhuZC55ToCvlwp)`ni2O(nO;72W1E1ONU^@pXQkYUg$w%X)VpO3jU69zn- zu)_%*7_vI7wTi!^5Vh8dEcJFgAQJqP=(Dnl@X`VJ1IL-YGzWj+3t?~HL4lxbh_&Na zCJSo8vPqI&bYQxCO(BonbjQx1d(k_l0p790@8||_+a2d=;G)=SlTQ<^4{N$$&}Zp# zYbEoOrFIi&mP}*}m7Ac^#t&WL+gM-UQJJ(s9m&9nm`UrzQER0YV|v}!u>|zWq~nPm z1HObb+w>okMBM~WcrFIE+EM(}(TmRQ&Kx~Xyj|OjFqDT=V5^PJYv|#2DU>j*KycpZ zMQ7n-pLbbtx$Ep`zBiK~pLDm!Ch4X{#h@$B{>4*dwY5e8HWlJY8$V(QxSu=*nwQ=`4{anL!Y2`a=;NEZC(WlyB$>3~uZ}ew2Uo<7qQ4VS?l{!Pm8-8Zc*R$r^~MueGg4~52?+A_FjiBx0#H7KHXJZo<{xOCth-{RCL3lAdGR@mwtpSUgk>p?HNi2rw; z<}>eZF2~w4CEp8KCWsi*H0Wc`yazgnv^SX$=v@n^g7B`QtgN;Lemn-hygSp9S zxiBgEHw{m?eiw5i?%yM2*YDDJ#r3;V7@5h#mOp6O#DVt_B#SbiM~Bko1^wvKAxYb=Bm|P zehd5+2+>?Dc{S$;^Ipw^6Rzf!jGJjw9W0D-jX3}K#2_a^#W#Ro2fSt|u$R?<9Btqu&1fSM(jnyfiLF^J8mCYR0)fr`bV z**H3}SV3V7^krBdt~pnY4Wz3!RL~6<+QV&6deCc%iFzH$dL1e6jxrxz zAEPoc;*F@3nZ5?1oUenZ)a#hsM7^I=m2#Ep*a)0!1;ZUnV{nGi!%MYwX=0=b$G3YE zDd&MIOO9>aiK>fX8qSyFar z2IC#W1?E5xGGD;n;yNbOW|dh0r+> zQHEu~3c>z4HV2(qC;{*)fV0q~$g)62$SyRwWDi9ZSr+67*+Ws6?EWRHEDeB!?EWP# z*7qSC4j}zHH#HO5|o3l{OlNCMZ>{3Mwl!o(MFrQuO(&Ci5 zSS;icvZr9J6sw)MPvxb1o5;Qw`zY+3xGzf!b4q#6Ik`-OK*T2KPc8#`QIT`L{EZ^Z zqB$aaI2>CDLw8oKxT)W%^PFo{@GSZhW1VY-u|&bmIeovBbw!2*usilQkN~SHbvEal zE9%h9jB3vSMt2NWzh{&I9hDE7<)i1n_qIhR(nTM@RMNIXggosYc- zJz#OIeCAwuP|l2;S2AW5#<=hx7(*1!oHHw>b^=1CeW%^jZbY`Tk*ZJQjzG@xFf;#*+up zx3+}v@st={z`nIzu;XnK#%IRPVFGz8_JMP}ElY}JvSQ{OTTNl;!U9I)djdnFA6w0g z{u2{MqZ45WPg&GYgmIJnv>G;vQaW=kJ)$YbKoEKp(1*k?Ep^|H`8jDCPyDA`osDVqRAsW%D|+Ml%AcNN*iP7`^?-8Zfsg z8auZD!nYt-jNM3tVz&T-xdS!9}Dfob)$pL>geO9 z;n{tM2;)QW=;L%&w3guV2(biHW>b{!*-Y~c_l0osrIs6PhmSsiGIoRc;DY=zOO%HaVre=Z8IrOa8kmT}8 z(5UI5XHiqN4Cfr|_J*mN9_$89mHM0W>2uOi_$h+V7Pyd+vCzXleGV2wtHwFk>i}W1 z3gC4FYE?PsTwESM>WFYjAV54&&&2_#pIc`V-m1clT8wxm;W6Uzqy+Nok9rj+Xxw|p zlWZ$oWeD+XhvQheq6Rq0lwU`7&s-smwP$7rWE$bH4q)x)hzStH!K_<}S%AA_Y&gc&Zr0A|o?c+UA3 z>3C{9!%{3lx$`eFVa_pO z_36zrHbfYg<=jTFv8d`fr*qz#sdvEBIe^GJ;MW7vWZVIG-e#~Qx7F7J9^Ug+elf6g z^sn+B;A4aJ!H^sxbHwxV*q{ga%Yp=(StWt}vfzPT*d(!h%KQw2%##2wZ1MnqF+{5k-B4$ z?3Y07A7Nvhu8W{qF@ZSE5&K3xJh4EsWkeVwi+!Woh5sf43ERqC8Uf++`X+!#4N<8i zIzPUw?JEGP#|6g6KfbIU7g$krPK-&I$gb%D`H3+ypzAd^XJNb!QjN$L#$iNvME+4x znnjGLYKq~{M@7K73W~8Kqp@hHt=)d=jk(vmYrAVuN$e&?#qK?C?&yZ<;+uPC=DX+K z_-pshnfcmuZN6rI*!ynh{M)-{IyZFgnV;LcZN9Usb9Vd8j_Ey}8-Dw4=dRjL>|!-Y z=##SBcfAh3ckSJ=qc*hzgaAF$)yauFH@y4)`(1*75S1KKzNOMx~{pjONqXJLuD#tB%y zv~dC~;!E`QUGp=q&tStHo+(%ukx5iZ1n1aYolTsFW>$Ukuym_$9+q|W&7&k_eRHua zt#2NTqgd)zay9AzER`#X8gu|9zw4iizO><=hh~BO@n9mAz7`S)52P=B^RV=>jJ_#6li1k?U`=-WfS zd06&45FSX0`{$7_G)b;TJ<|OWRB(JWvmP8D&AJD{N8SXJL=A2l_C{AAJielD_FfI@ z0~zrE^daf|KzM?g*H9qy!srB&|8ic9ZWttjlP(w{C=LWe1jT~Dh{$_<0kP3{|AHh- z21%9*jtmMB!I42>B0#cokeZeA0cutWl5BF5f6bHvh$L(95y7e={&{GrMDjFxbg^us zBx-PCtga$?8dZqZStL=T3i(YSv4<$heiSN!=mXd4fOwRnpi%qC_jjdE4KJD?3t;o| zdJ|NAg3*k=TkM}psS5cgV^t#mL}~>oFg8+M3XF|Zo&sYdRjHuZXvNAu8~i6a`0lqP zYGhl=0+(-|K+bjLb##9UaU$PWcqD3YYQKL!WRI}g*t0w5I^LR@+q3wZT6|qzccXv KDT4nY*M9+f@7C-9 literal 0 HcmV?d00001 diff --git a/src/test/scala/fr/polytechnique/cmap/cnam/statistics/CustomStatisticsSuite.scala b/src/test/scala/fr/polytechnique/cmap/cnam/statistics/CustomStatisticsSuite.scala new file mode 100644 index 0000000..a1e7daa --- /dev/null +++ b/src/test/scala/fr/polytechnique/cmap/cnam/statistics/CustomStatisticsSuite.scala @@ -0,0 +1,122 @@ +package fr.polytechnique.cmap.cnam.statistics + +import org.apache.spark.sql.DataFrame +import fr.polytechnique.cmap.cnam.{SharedContext, utilities} + +class CustomStatisticsSuite extends SharedContext { + + def getSampleDf: DataFrame = { + val srcFilePath: String = "src/test/resources/statistics/custom-statistics/IR_BEN_R.csv" + val sampleDf = sqlContext + .read + .option("header", "true") + .option("inferSchema", "true") + .option("delimiter", ";") + .csv(srcFilePath) + sampleDf + } + + "customDescribe" should "compute statistics on all columns" in { + + val sqlCtx = sqlContext + import sqlCtx.implicits._ + + // Given + val inputDf = getSampleDf + .select("BEN_CDI_NIR", "BEN_DTE_MAJ", "BEN_SEX_COD", "MAX_TRT_DTD", "ORG_CLE_NEW", "NUM_ENQ") + val expected: DataFrame = Seq( + ("0", "0", 2L, 1L, "0", "0", "0.0", "BEN_CDI_NIR"), + ("01/01/2006", "25/01/2006", 2L, 2L, "NA", "NA", "NA", "BEN_DTE_MAJ"), + ("1", "2", 2L, 2L, "3", "3", "1.5", "BEN_SEX_COD"), + ("07/03/2008", "07/03/2008", 1L, 1L, "NA", "NA", "NA", "MAX_TRT_DTD"), + ("CODE1234", "CODE1234", 2L, 1L, "NA", "NA", "NA", "ORG_CLE_NEW"), + ("Patient_01", "Patient_02", 2L, 2L, "NA", "NA", "NA", "NUM_ENQ") + ).toDF("Min", "Max", "Count", "CountDistinct", "Sum", "SumDistinct", "Avg", "ColName") + + // When + import fr.polytechnique.cmap.cnam.statistics.CustomStatistics._ + val result = inputDf.customDescribe(inputDf.columns) + + // Then + result.show + result.printSchema + expected.show + expected.printSchema + + import utilities.DFUtils.CSVDataFrame + assert(expected sameAs result) + } + + it should "compute only disctinct statistics when distinct only flag is set" in { + + val sqlCtx = sqlContext + import sqlCtx.implicits._ + + // Given + val inputDf = getSampleDf + .select("BEN_CDI_NIR", "BEN_DTE_MAJ", "BEN_SEX_COD", "MAX_TRT_DTD", "ORG_CLE_NEW", "NUM_ENQ") + val expected: DataFrame = Seq( + ("0", "0", 1L, "0", "BEN_CDI_NIR"), + ("01/01/2006", "25/01/2006", 2L, "NA", "BEN_DTE_MAJ"), + ("1", "2", 2L, "3", "BEN_SEX_COD"), + ("07/03/2008", "07/03/2008", 1L, "NA", "MAX_TRT_DTD"), + ("CODE1234", "CODE1234", 1L, "NA", "ORG_CLE_NEW"), + ("Patient_01", "Patient_02", 2L, "NA", "NUM_ENQ") + ).toDF("Min", "Max", "CountDistinct", "SumDistinct", "ColName") + + // When + import fr.polytechnique.cmap.cnam.statistics.CustomStatistics._ + val result = inputDf.customDescribe(inputDf.columns, distinctOnly = true) + + // Then + result.show + result.printSchema + expected.show + expected.printSchema + + import utilities.DFUtils.CSVDataFrame + assert(expected sameAs result) + } + + it should "compute statistics on specified columns" in { + + val sqlCtx = sqlContext + import sqlCtx.implicits._ + + // Given + val inputDf = getSampleDf + val inputColumns = Array("BEN_TOP_CNS", "BEN_DCD_DTE", "NUM_ENQ") + val expected = { + Seq( + ("1", "1", 2L, 1L, "2", "1", "1.0", "BEN_TOP_CNS"), + ("25/01/2008", "25/01/2008", 1L, 1L, "NA", "NA", "NA", "BEN_DCD_DTE"), + ("Patient_01", "Patient_02", 2L, 2L, "NA", "NA", "NA", "NUM_ENQ") + ).toDF("Min", "Max", "Count", "CountDistinct", "Sum", "SumDistinct", "Avg", "ColName") + } + + // When + import fr.polytechnique.cmap.cnam.statistics.CustomStatistics._ + val resultColumns = inputDf.customDescribe(inputColumns) + + // Then + import utilities.DFUtils.CSVDataFrame + assert(resultColumns sameAs expected) + } + + it should "throw an exception" in { + + // Given + val givenDF = getSampleDf + val invalidCols = Array("NUM_ENQ", "INVALID_COLUMN") + + // When + import fr.polytechnique.cmap.cnam.statistics.CustomStatistics._ + val thrown = intercept[java.lang.IllegalArgumentException] { + givenDF.customDescribe(invalidCols).count + } + + // Then + assert(thrown.getMessage.matches("Field \"[^\"]*\" does not exist.")) + } + +} diff --git a/src/test/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsConfigSuite.scala b/src/test/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsConfigSuite.scala new file mode 100644 index 0000000..69b6e26 --- /dev/null +++ b/src/test/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsConfigSuite.scala @@ -0,0 +1,25 @@ +package fr.polytechnique.cmap.cnam.statistics + +import fr.polytechnique.cmap.cnam.SharedContext + +class StatisticsConfigSuite extends SharedContext { + + "toString" should "return all the values from the test config file correctly" in { + + // Given + val expectedResult = "flatTableName -> MCO \n" + + "mainTableName -> MCO_C \n" + + "dateFormat -> dd/MM/yyyy \n" + + "inputPath -> src/test/resources/statistics/flat_table/input/newMCO \n" + + "statOutputPath -> target/test/output/statistics/newMCO" + + // When + import fr.polytechnique.cmap.cnam.statistics.StatisticsConfig._ + val result = StatisticsConfig.newFlatConfig.head.prettyPrint + + println(result) + println(expectedResult) + assert(result === expectedResult) + } + +} diff --git a/src/test/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMainSuite.scala b/src/test/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMainSuite.scala new file mode 100644 index 0000000..a848c3c --- /dev/null +++ b/src/test/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMainSuite.scala @@ -0,0 +1,187 @@ +package fr.polytechnique.cmap.cnam.statistics + +import java.sql.Date +import org.apache.spark.sql.types._ +import fr.polytechnique.cmap.cnam.{SharedContext, utilities} + +class StatisticsMainSuite extends SharedContext{ + + "changeColumnNameDelimiter" should "change the column name delimiters from dot to underscore " in { + + // Given + val sqlCtx = sqlContext + import sqlCtx.implicits._ + + val inputDf = Seq( + (1, 1.0, "val1", "val2") + ).toDF("key1", "noDelimiter", "key.delimiter1", "key.delimiter2") + + val expectedResult = Seq( + (1, 1.0, "val1", "val2") + ).toDF("key1", "noDelimiter", "key__delimiter1", "key__delimiter2") + + // When + import StatisticsMain.OldFlatHelper + val result = inputDf.changeColumnNameDelimiter + + // Then + import utilities.DFUtils.CSVDataFrame + assert(result sameAs expectedResult) + } + + "changeSchema" should "change the column types to the passed format" in { + + // Given + val sqlCtx = sqlContext + import sqlCtx.implicits._ + + val inputDf = Seq( + ("1", "1", "1", "2006-2-20") + ).toDF("BEN_CTA_TYP", "CPL_REM_BSE", "ER_PHA_F__PHA_PRS_C13", "FLX_TRT_DTD") + + val inputSchema: Map[String, List[(String, String)]] = Map( + "ER_PRS_F" -> List(("BEN_CTA_TYP", "Integer"), ("CPL_REM_BSE", "Double"), ("FLX_TRT_DTD", "Date")), + "ER_PHA_F" -> List(("PHA_PRS_C13", "Long")) + ) + val expectedResult = Seq( + (1, 1.0, 1L, Date.valueOf("2006-02-20")) + ).toDF(inputDf.columns: _*) + + val mainTableName = "ER_PRS_F" + val dateFormat = "yyyy-MM-dd" + + // When + import StatisticsMain.OldFlatHelper + val result = inputDf.changeSchema(inputSchema, mainTableName, dateFormat) + + // Then + expectedResult.show + expectedResult.printSchema + result.show + result.printSchema + + import utilities.DFUtils.CSVDataFrame + assert(inputDf.schema != result.schema) + assert(result sameAs expectedResult) + } + + it should "consider default date format as dd/MM/yyyy when it is not specified explicitly" in { + + // Given + val sqlCtx = sqlContext + import sqlCtx.implicits._ + + val inputColumn = "FLX_TRT_DTD" + val inputDf = Seq("20/02/2006").toDF(inputColumn) + + import org.apache.spark.sql.functions._ + val dateFormat = "dd/MM/yyyy" + val expectedResultColumn = to_date( + unix_timestamp(inputDf(inputColumn), dateFormat).cast(TimestampType) + ).as(inputColumn) + + val expectedResult = inputDf.select(expectedResultColumn) + val schemaMap = Map("ER_PRS_F" -> List(("FLX_TRT_DTD", "Date"))) + val mainTableName = "ER_PRS_F" + + // When + import StatisticsMain.OldFlatHelper + val result = inputDf.changeSchema(schemaMap, mainTableName) + + // Then + inputDf.printSchema + inputDf.show + expectedResult.show + result.show + result.printSchema + + import utilities.DFUtils.CSVDataFrame + assert(result sameAs expectedResult) + assert(inputDf.schema != result.schema) + } + + "annotateJoiningTablesColumns" should "prefix table name to the column names of the joining " + + "tables" in { + + // Given + val sqlCtx = sqlContext + import sqlCtx.implicits._ + + val testInput: List[(String, List[(String,String)])] = List( + "ER_PRS_F" -> List(("Col1", "Type1"), ("Col2", "Type2")), + "ER_PHA_F" -> List(("Col1", "Type1"), ("Col2", "Type2")), + "Random" -> List(("Col1", "Type1"), ("Col2", "Type2")) + ) + val expectedResult: List[Map[String, String]] = List( + Map("Col1" -> "Type1", "Col2" -> "Type2"), + Map("ER_PHA_F__Col1" -> "Type1", "ER_PHA_F__Col2" -> "Type2"), + Map("Random__Col1" -> "Type1", "Random__Col2" -> "Type2") + ) + val mainTableName = "ER_PRS_F" + val sampleDf = Seq("dummyValue").toDF("Col_1") + + // When + import StatisticsMain.OldFlatHelper + val testResult = testInput.map(testInput => sampleDf.annotateJoiningTablesColumns(testInput, mainTableName)) + + // Then + testResult foreach println + assert(testResult === expectedResult) + } + + "prefixColumnName" should "concatenate two given string with __ (double underscores)" in { + + // Given + val sqlCtx = sqlContext + import sqlCtx.implicits._ + + val tableName = "ER_PHA_F" + val columnName = "FLX_TRT_DTD" + val expectedResult = tableName + "__" + columnName + val sampleDf = Seq("dummyValue").toDF("Col_1") + + // When + import StatisticsMain.OldFlatHelper + val result = sampleDf.prefixColName(tableName, columnName) + + // Then + assert(result == expectedResult) + } + + "writeStatistics" should "compute statistics on the input DF and write it in a given path" in { + + // Given + val inputDfPath = "src/test/resources/statistics/flat_table/input/newMCO" + val expectedResultPath = "src/test/resources/statistics/flat_table/expected/newMCOStat" + val resultPath = "target/test/output/statistics/newMCO" + val inputDf = sqlContext.read.option("mergeSchema", "true").parquet(inputDfPath).drop("year") + val expectedResult = sqlContext.read.parquet(expectedResultPath) + + // When + import StatisticsMain.OldFlatHelper + inputDf.writeStatistics(resultPath) + val result = sqlContext.read.parquet(resultPath) + + // Then + import utilities.DFUtils.CSVDataFrame + assert(result sameAs expectedResult) + + } + + "run" should "run the overall pipeline correctly without any error" in { + + // Given + val expectedResultPath = "src/test/resources/statistics/flat_table/expected/newMCOStat" + val expectedResult = sqlContext.read.parquet(expectedResultPath) + val resultPath = "target/test/output/statistics/newMCO" + + // When + StatisticsMain.run(sqlContext, Map()) + val result = sqlContext.read.parquet(resultPath) + + // Then + import utilities.DFUtils.CSVDataFrame + assert(result sameAs expectedResult) + + } +} From c15b84283a97caa7962c5d5f9b85b26778383e32 Mon Sep 17 00:00:00 2001 From: sathiya Date: Tue, 16 May 2017 18:52:51 +0200 Subject: [PATCH 2/2] CNAM-181 Review Comments --- .../cnam/statistics/CustomStatistics.scala | 55 +++--- .../cmap/cnam/statistics/OldFlatHelper.scala | 73 +++++++ .../cnam/statistics/StatisticsConfig.scala | 33 ++-- .../cmap/cnam/statistics/StatisticsMain.scala | 73 +------ ...3b53-4e16-bc0b-b2ba54555cc6.snappy.parquet | Bin 0 -> 3428 bytes ...2fc7-4e80-8a2c-d27f62e774a3.snappy.parquet | Bin 3440 -> 0 bytes .../statistics/CustomStatisticsSuite.scala | 38 ++-- .../cnam/statistics/OldFlatHelperSuite.scala | 182 ++++++++++++++++++ .../cnam/statistics/StatisticsMainSuite.scala | 164 ---------------- 9 files changed, 330 insertions(+), 288 deletions(-) create mode 100644 src/main/scala/fr/polytechnique/cmap/cnam/statistics/OldFlatHelper.scala create mode 100644 src/test/resources/statistics/flat_table/expected/newMCOStat/part-00000-5ab4b7e5-3b53-4e16-bc0b-b2ba54555cc6.snappy.parquet delete mode 100644 src/test/resources/statistics/flat_table/expected/newMCOStat/part-00000-7d919f75-2fc7-4e80-8a2c-d27f62e774a3.snappy.parquet create mode 100644 src/test/scala/fr/polytechnique/cmap/cnam/statistics/OldFlatHelperSuite.scala diff --git a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/CustomStatistics.scala b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/CustomStatistics.scala index e8a1fa6..370d5e3 100644 --- a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/CustomStatistics.scala +++ b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/CustomStatistics.scala @@ -2,39 +2,40 @@ package fr.polytechnique.cmap.cnam.statistics import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.{NumericType, StructType} +import org.apache.spark.sql.types.{NumericType, StructField, StructType} object CustomStatistics { implicit class Statistics(val df: DataFrame) { - def customDescribe(inputColumns: Array[String], distinctOnly: Boolean = false): DataFrame = { + def customDescribe(inputColumns: Seq[String], distinctOnly: Boolean = false): DataFrame = { - def computeAvailableAgg(schema: StructType, - colName: String): DataFrame = - colName match { - case numericColumn if schema.apply(numericColumn).dataType.isInstanceOf[NumericType] => - df.select(numericColumn) + def computeAvailableAgg( + schema: StructType, + colName: String): DataFrame = df.schema(colName) match { + + case StructField(numericColumn: String, _: NumericType, _, _) => + df.select(numericColumn) .agg( - min(numericColumn) cast "string" as "Min", - max(numericColumn) cast "string" as "Max", - count(numericColumn) cast "long" as "Count", - countDistinct(numericColumn) cast "long" as "CountDistinct", - sum(numericColumn) cast "string" as "Sum", - sumDistinct(numericColumn) cast "string" as "SumDistinct", - avg(numericColumn) cast "string" as "Avg" + min(numericColumn).cast("string").as("Min"), + max(numericColumn).cast("string").as("Max"), + count(numericColumn).cast("long").as("Count"), + countDistinct(numericColumn).cast("long").as("CountDistinct"), + sum(numericColumn).cast("double").as("Sum"), + sumDistinct(numericColumn).cast("double").as("SumDistinct"), + avg(numericColumn).cast("double").as("Avg") ).withColumn("ColName", lit(numericColumn)) - case _ => - df.select(colName) + case _ => + df.select(colName) .agg( - min(colName) cast "string" as "Min", - max(colName) cast "string" as "Max", - count(colName) cast "long" as "Count", - countDistinct(colName) cast "long" as "CountDistinct" - ).withColumn("Sum", lit("NA")) - .withColumn("SumDistinct", lit("NA")) - .withColumn("Avg", lit("NA")) + min(colName).cast("string").as("Min"), + max(colName).cast("string").as("Max"), + count(colName).cast("long").as("Count"), + countDistinct(colName).cast("long").as("CountDistinct") + ).withColumn("Sum", lit(null).cast("double")) + .withColumn("SumDistinct", lit(null).cast("double")) + .withColumn("Avg", lit(null).cast("double")) .withColumn("ColName", lit(colName)) } @@ -44,12 +45,10 @@ object CustomStatistics { if (distinctOnly) { outputDF - .drop("Count") - .drop("Sum") - .drop("Avg") - } - else + .drop("Count", "Sum", "Avg") + } else { outputDF + } } } } diff --git a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/OldFlatHelper.scala b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/OldFlatHelper.scala new file mode 100644 index 0000000..137e7c7 --- /dev/null +++ b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/OldFlatHelper.scala @@ -0,0 +1,73 @@ +package fr.polytechnique.cmap.cnam.statistics + +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.functions.col +import fr.polytechnique.cmap.cnam.statistics +import fr.polytechnique.cmap.cnam.utilities.DFUtils + +object OldFlatHelper { + + implicit class ImplicitDF(data: DataFrame) { + + final val OldDelimiter: String = "\\." + final val NewDelimiter: String = "__" + + def changeColumnNameDelimiter: DataFrame = { + val renamedColumns = data.columns.map(columnName => { + val splittedColName = columnName.split(OldDelimiter) + if (splittedColName.size == 2) { + col("`" + columnName + "`").as(splittedColName(0) + NewDelimiter + splittedColName(1)) + } else { + col(columnName) + } + }) + + data.select(renamedColumns: _*) + } + + def changeSchema( + schema: List[TableSchema], + mainTableName: String, + dateFormat: String = "dd/MM/yyyy"): DataFrame = { + + val unknownColumnNameType = Map("HOS_NNE_MAM" -> "String") + + val flatSchema: Map[String, String] = schema.map(tableSchema => + annotateJoiningTablesColumns(tableSchema, mainTableName) + ).reduce(_ ++ _) ++ unknownColumnNameType + + DFUtils.applySchema(data, flatSchema, dateFormat) + } + + def annotateJoiningTablesColumns( + tableSchema: TableSchema, + mainTableName: String): Map[String, String] = { + + val tableName: String = tableSchema.tableName + val columnTypeMap: Map[String, String] = tableSchema.columnTypes + + tableName match { + case `mainTableName` => columnTypeMap + case _ => columnTypeMap.map { + case (colName, colType) => (prefixColName(tableName, colName), colType) + } + } + } + + def prefixColName(tableName: String, columnName: String): String = { + tableName + NewDelimiter + columnName + } + + import statistics.CustomStatistics._ + + def computeStatistics: DataFrame = data.customDescribe(data.columns) + + def writeStatistics(outputPath: String): Unit = { + data + .computeStatistics + .write + .parquet(outputPath) + } + } + +} diff --git a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsConfig.scala b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsConfig.scala index aed6469..3974295 100644 --- a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsConfig.scala +++ b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsConfig.scala @@ -17,24 +17,33 @@ object StatisticsConfig { newConfig.withFallback(defaultConfig).resolve() } - val oldFlatConfig: List[Config] = if(conf.hasPath("old_flat")) - conf.getConfigList("old_flat").asScala.toList - else - List[Config]() + val oldFlatConfig: List[Config] = { + if(conf.hasPath("old_flat")) { + conf.getConfigList("old_flat").asScala.toList + } else { + List[Config]() + } + } - val newFlatConfig: List[Config] = if(conf.hasPath("new_flat")) - conf.getConfigList("new_flat").asScala.toList - else - List[Config]() + val newFlatConfig: List[Config] = { + if(conf.hasPath("new_flat")) { + conf.getConfigList("new_flat").asScala.toList + } else { + List[Config]() + } + } implicit class StatConfig(statConf: Config) { val flatTableName: String = statConf.getString("name") val mainTableName: String = statConf.getString("main_table") - val dateFormat: String = if(statConf.hasPath("date_format")) - statConf.getString("date_format") - else - "dd/MM/yyyy" + val dateFormat: String = { + if(statConf.hasPath("date_format")) { + statConf.getString("date_format") + } else { + "dd/MM/yyyy" + } + } val inputPath: String = statConf.getString("input_path") val statOutputPath: String = statConf.getString("output_stat_path") diff --git a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMain.scala b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMain.scala index ffddefd..c1a1d42 100644 --- a/src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMain.scala +++ b/src/main/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMain.scala @@ -1,83 +1,28 @@ package fr.polytechnique.cmap.cnam.statistics -import org.apache.spark.sql.functions.col -import org.apache.spark.sql.{DataFrame, Dataset, SQLContext} +import org.apache.spark.sql.{Dataset, SQLContext} import fr.polytechnique.cmap.cnam.flattening.FlatteningConfig -import fr.polytechnique.cmap.cnam.utilities.DFUtils -import fr.polytechnique.cmap.cnam.{Main, statistics, utilities} +import fr.polytechnique.cmap.cnam.{Main, utilities} + +case class TableSchema(tableName: String, columnTypes: Map[String, String]) object StatisticsMain extends Main { override def appName = "Statistics" - implicit class OldFlatHelper(data: DataFrame) { - - final val OldDelimiter: String = "\\." - final val NewDelimiter: String = "__" - - def changeColumnNameDelimiter: DataFrame = { - val renamedColumns = data.columns - .map( - columnName => { - val splittedColName = columnName.split(OldDelimiter) - if (splittedColName.size == 2) - col("`" + columnName + "`").as(splittedColName(0) + NewDelimiter + splittedColName(1)) - else - col(columnName) - }) - - data.select(renamedColumns: _*) - } - - def changeSchema(schema: Map[String, List[(String,String)]], - mainTableName: String, - dateFormat: String = "dd/MM/yyyy"): DataFrame = { - - val unknownColumnNameType = Map("HOS_NNE_MAM" -> "String") - val flatSchema: Map[String, String] = schema - .map(tableColumns => annotateJoiningTablesColumns(tableColumns, mainTableName)) - .reduce(_ ++ _) ++ unknownColumnNameType - - DFUtils.applySchema(data, flatSchema, dateFormat) - } - - def annotateJoiningTablesColumns(tableSchema: (String, List[(String,String)]), - mainTableName: String): Map[String, String] = { - val tableName: String = tableSchema._1 - val columnTypeList: List[(String, String)] = tableSchema._2 - - tableName match { - case `mainTableName` => columnTypeList.toMap - case _ => columnTypeList.map(x => (prefixColName(tableName, x._1), x._2)).toMap - } - } - - def prefixColName(tableName: String, columnName: String): String = { - tableName + NewDelimiter + columnName - } - - import statistics.CustomStatistics._ - def computeStatistics: DataFrame = data.customDescribe(data.columns) - - def writeStatistics(outputPath: String): Unit = { - data - .computeStatistics - .write - .parquet(outputPath) - } - } - def run(sqlContext: SQLContext, argsMap: Map[String, String]): Option[Dataset[_]] = { argsMap.get("conf").foreach(sqlContext.setConf("conf", _)) argsMap.get("env").foreach(sqlContext.setConf("env", _)) import utilities.DFUtils._ - val tablesSchema: Map[String, List[(String,String)]] = FlatteningConfig.columnTypes + val tablesSchema: List[TableSchema] = FlatteningConfig.columnTypes.map(x => + TableSchema(x._1, x._2.toMap)).toList + import OldFlatHelper.ImplicitDF import StatisticsConfig.StatConfig - StatisticsConfig.oldFlatConfig.foreach {conf => + StatisticsConfig.oldFlatConfig.foreach { conf => logger.info(s"Computing Statistics on the old Flat: ${conf.flatTableName}") println(conf.prettyPrint) @@ -88,7 +33,7 @@ object StatisticsMain extends Main { .writeStatistics(conf.statOutputPath) } - StatisticsConfig.newFlatConfig.foreach {conf => + StatisticsConfig.newFlatConfig.foreach { conf => logger.info(s"Computing Statistics on the new Flat: ${conf.flatTableName}") println(conf.prettyPrint) diff --git a/src/test/resources/statistics/flat_table/expected/newMCOStat/part-00000-5ab4b7e5-3b53-4e16-bc0b-b2ba54555cc6.snappy.parquet b/src/test/resources/statistics/flat_table/expected/newMCOStat/part-00000-5ab4b7e5-3b53-4e16-bc0b-b2ba54555cc6.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..dcf19336ba23b0b2674ed075ca85f381b6abaf92 GIT binary patch literal 3428 zcmb_fU2NOd6~0#@?JJA2X<3x1)WT##R@F}7Nb1k_lvI>NOLFxW(hi(>n>dil? zf7&cRg<_;b)9t}9G9W;QwJ5MWv_rRb58Hq|6x+~;0_(5<>wr9T!wU2*L$Lu}5Ol%L zm7Lhlk|GFp0bXA5o^!r)zWd!vs-_u=zz9u7c-LMMG6I88(-t8JAw`C!s8N9%h{vKB zp2(OD#J~WlvYf!K5v2>pwffq8V`16ZWJ1xLnHX>Jm9Me_VV#Wa-;*l z#4FFlEhOVKY=Fz zUlY`t?Qo_vSxE&%fVv zQHs%wXAq&{7T-bPPYRWi&`se-5MnVSrrA!WIqx7Y%q`pyG=~$#U1E@=+nFTOkVRU^ z?cN2g%Bk%x4ldAg182LTmyqK;cj|o+v;0{vvO8Xl6@U6$`A5C;Xn;o=eJBEi2s z_0HS0bw4YmteBmY}%9#hg{Bnl)>@YKWaVZPGII$(E*m<_gpO zs!wWGWvYywjA05+aoYlW$#|jK>m(JeC^(s-0mE9^gx=IAdcoS2fzhl3nO^ZjEj0K4rqX)5@DIFQ~`V)h5sH}d7 zu$$D3!~RPieNwmbh2by~_P;4Z*nFXEsj;LSe+HWqK7BG{#RT_C8*rU;yPu( z(A5OujjVN8mmbF?Z{#xxgxcL$0$yiFf6)SGa(U?w7zY_+O4_0LME6nljHtKu%?+-sR+xaOJCSX ztyCTRCKhYeq6Gpu@SCw2JHXc&?F)a<1LRqgN2D1$GB3U%1rZMt0XN2u-HEFjIAiZrCQ;%kxXaaGtiu^;2}r3?q@vF0U`Gwa2d3 zI5)q#HotTno^??_bA7Qra>D4IIqk(0G$Il^v&##W`eFm3daq*NfbRz(G9YliIO$+~ zEo{LC6W)Ka-YaZ~q;`E6gx+O$Th>ptE#Kp|PRWwUzbbYeSKtx%tMz?CQ|ivuB2uKqfE+a)jqQcJoUo;P=w{!a{v! z0Sv&i5gED@bZG3%xpR>n=lZEfcW#M%$9-Y>9^|?D!s>&e_iT|zjsp7G@ts{>rzJmh z^}F`rqXz*MJ-YMuZr=$23w`eb-jBwk=a5+37R~;}mB5RQmDTy>rGO%hNJ=0)vp&Bt z8;I3s>ap2+BRMl8&&C?FGsowWqiVF?P@>V%Idx=iHaeG_J3jXW`UiZJ)!_fte*u!j B)1&|Z literal 0 HcmV?d00001 diff --git a/src/test/resources/statistics/flat_table/expected/newMCOStat/part-00000-7d919f75-2fc7-4e80-8a2c-d27f62e774a3.snappy.parquet b/src/test/resources/statistics/flat_table/expected/newMCOStat/part-00000-7d919f75-2fc7-4e80-8a2c-d27f62e774a3.snappy.parquet deleted file mode 100644 index 94f6a04f485b3a9ed83c0483a897e0e55e88e8cb..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 3440 zcmb_fe{3699e?k9=bY@}yxr;>-C7#FJP9kKjf*+ZWrZfAubj(vI%xG0@Px8BW?|r`C&-Z=a_j`A)>!w0jvdNM+9A%OvEJB-1lt&0DGBibv5usm;$1u#u zm>Y-#0I9N^z@C({561Pz`a*Mg-Ii6{!>SCI#hHFNCC31h!UC(vE@8L}a5^+aX4P007E=IqZQ;1b4mk_dluFb^BBQz|QXR0e)*h*c$JB_rmbxV6@BPv+rlF z4DeZBsb|`?b>jH!^`*}}2^ikbaFxQDmjS=|rONOHAb67{cRydC>p5sX-+^%mw*#dg({nc2IqvmR9cHlI~qx9 z9IPZYS&36|HQXP8)9fgcRB$(FoWfmvURLlJn#GDYK{e(GNb1Dzc-34&K8$^BflHn~ zZ)`FlSGYbFSfEpJ4#hFYy~@2*%q0?1nko#3_J0!k=38>%NBq{`#(y)!-%^i!eZ#f2 z*}U`a#@)C1ueOEWU%c2B`YZ3nw?XI=BwHo|m04z;$b}K(>LM69Jo_?q!?aPEX@eY? zB-Q&Fz|#!iot+gV(wuL%0PRPvI1Zq`5;Kkvzlm6JV3_*wqH!-`Sd7L?+i!gbW`Mp< z#>-?MrH{Y@&^ujSEU@z#dak=0u$M{wKVc7Fbp&}du*shI`2<2gx#alr!u#mCpKW@m z#AvQ}5TVkx*GcK;$$Xi3DZQGfgjJmenxH^cj$?6{Y~u6s}p4hi{S=f8*9w=vp0d-umne?a(n+oTEJ;#`qWn)0wEjRN5FbYH4NNZM`(hlfcVeJA zC{jD~V{u8w-EmoxY1L@yR5BrH*pFhaEp7!b~Y|7i(7Co~W5(S6-j8O=GH^fj{s>Xq?8BZda!(*v*<2af|m6IKWR7Yd$w$ z)l0<9mP{DdD;D&YG3f)eCkvxl;_j`PwqC8`S?uO3wGwuV;LJ9vN8lbPaW*3W$?hod z6TL=t%FgTDRh%gU-+lm8ay47GZ9O*wKh=-eYJJp?_*^~*7dBtx{Ky|HdYXcrH_D-B zfhvzjLlLo>HEmiK~*?b9{ zbllhSAwRDdjKLaZo2*TT{9@L~0(cUF@D}ZAcG7p9TIKYTZB5tlZ6M9n@^-0UQ4`Q0 zO@#e?-JA(s?KP$hyHFg7Aad|qGPo@i zE4CU>%Gxn(O$MQ^o`o*S1w$`;j!Up}Glv^g0n<!LL%_t?VMiH_A!cI+uqQiCFvdbknHbeT5;p^qv=x4E5ua#^_kV8KmopS(vgVjF_ z4fle0_S8{n&Vj5;-;{=t7g7YwSO@keZZIyt+tu7}t=v3NiW*=hJIPQ}om^F?_-)DOB zkVKXx{hDF(wsIA9dp=;NE0+%Pb-g^+)7G&EiI5jG=Lr*htthOWZmfR6UOV|*bE!eR zhzKK$=%_3#kHAx>Gj5!rV|E0ITyAA!dA&3Cw8!~{we^MNlQ7rDS{q9b?A$PZu=-%m zp}%mdGe%-pZe_9BSZcz3;06x%2fr5P`-u?LoID$BM?e@@2>g``T<5}kX4l6=_ziA1 zvT>$^{I)Z2i-Wa2&`ozgVS^*oXD9j&_8Jqohr^rh%qPB~!uN!SmV5Ww@$gF9!#&{< z9zNQ?D@>51V|bYfY%}4H_GiRhZ-ek<-wt-SbRC;5CUB7f%wC)Ao*f%1Z3p`cCh!gu z-tOsOA3jUNw+Tg$AU^m|`;NhlCjx&KU^yc`;&BE+Bej!9 zT6=y`f-HJ|qq#1vuDmEMHP;*SjrE53Dtm5d{fnoYL!(1$>#G|l*N37*t%c^|{Myjy zGv|hu;rL+*k`3l}a0|<);C^{yaj`MC2mr8bMu#3a9U47%{(N-DxN+w3joT@_W4^fZ zZ_KU6;@T%u? "Integer")), + TableSchema("ER_PRS_F", Map("CPL_REM_BSE" -> "Double")), + TableSchema("ER_PRS_F", Map("FLX_TRT_DTD" -> "Date")), + TableSchema("ER_PHA_F", Map("PHA_PRS_C13" -> "Long")) + ) + val expectedResult = Seq( + (1, 1.0, 1L, Date.valueOf("2006-02-20")) + ).toDF(inputDf.columns: _*) + + val mainTableName = "ER_PRS_F" + val dateFormat = "yyyy-MM-dd" + + // When + import OldFlatHelper.ImplicitDF + val result = inputDf.changeSchema(inputSchema, mainTableName, dateFormat) + + // Then + expectedResult.show + expectedResult.printSchema + result.show + result.printSchema + + import utilities.DFUtils.CSVDataFrame + assert(inputDf.schema != result.schema) + assert(result sameAs expectedResult) + } + + it should "consider default date format as dd/MM/yyyy when it is not specified explicitly" in { + + // Given + val sqlCtx = sqlContext + import sqlCtx.implicits._ + + val inputColumn = "FLX_TRT_DTD" + val inputDf = Seq("20/02/2006").toDF(inputColumn) + + import org.apache.spark.sql.functions._ + val dateFormat = "dd/MM/yyyy" + val expectedResultColumn = to_date( + unix_timestamp(inputDf(inputColumn), dateFormat).cast(TimestampType) + ).as(inputColumn) + + val expectedResult = inputDf.select(expectedResultColumn) + val schemaMap = List(TableSchema("ER_PRS_F", Map("FLX_TRT_DTD" -> "Date"))) + + val mainTableName = "ER_PRS_F" + + // When + import OldFlatHelper.ImplicitDF + val result = inputDf.changeSchema(schemaMap, mainTableName) + + // Then + inputDf.printSchema + inputDf.show + expectedResult.show + result.show + result.printSchema + + import utilities.DFUtils.CSVDataFrame + assert(result sameAs expectedResult) + assert(inputDf.schema != result.schema) + } + + "annotateJoiningTablesColumns" should "prefix table name to the column names of the joining " + + "tables" in { + + // Given + val sqlCtx = sqlContext + import sqlCtx.implicits._ + + val testInput: List[TableSchema] = List( + TableSchema("ER_PRS_F", Map("Col1PRS" -> "Type1")), + TableSchema("ER_PRS_F", Map("Col2PRS" -> "Type2")), + TableSchema("ER_PHA_F", Map("Col1" -> "Type1")), + TableSchema("ER_PHA_F", Map("Col2" -> "Type2")), + TableSchema("Random", Map("Col1" -> "Type1")), + TableSchema("Random", Map("Col2" -> "Type2")) + ) + val expectedResult: List[Map[String, String]] = List( + Map("Col1PRS" -> "Type1"), + Map("Col2PRS" -> "Type2"), + Map("ER_PHA_F__Col1" -> "Type1"), + Map("ER_PHA_F__Col2" -> "Type2"), + Map("Random__Col1" -> "Type1"), + Map("Random__Col2" -> "Type2") + ) + val mainTableName = "ER_PRS_F" + val sampleDf = Seq("dummyValue").toDF("Col_1") + + // When + import OldFlatHelper.ImplicitDF + val testResult = testInput.map { testInput => + sampleDf.annotateJoiningTablesColumns(testInput, mainTableName) + } + + // Then + testResult foreach println + assert(testResult === expectedResult) + } + + "prefixColumnName" should "concatenate two given string with __ (double underscores)" in { + + // Given + val sqlCtx = sqlContext + import sqlCtx.implicits._ + + val tableName = "ER_PHA_F" + val columnName = "FLX_TRT_DTD" + val expectedResult = tableName + "__" + columnName + val sampleDf = Seq("dummyValue").toDF("Col_1") + + // When + import OldFlatHelper.ImplicitDF + val result = sampleDf.prefixColName(tableName, columnName) + + // Then + assert(result == expectedResult) + } + + "writeStatistics" should "compute statistics on the input DF and write it in a given path" in { + + // Given + val inputDfPath = "src/test/resources/statistics/flat_table/input/newMCO" + val expectedResultPath = "src/test/resources/statistics/flat_table/expected/newMCOStat" + val resultPath = "target/test/output/statistics/newMCO" + val inputDf = sqlContext.read.option("mergeSchema", "true").parquet(inputDfPath).drop("year") + val expectedResult = sqlContext.read.parquet(expectedResultPath) + + // When + import OldFlatHelper.ImplicitDF + inputDf.writeStatistics(resultPath) + val result = sqlContext.read.parquet(resultPath) + + // Then + import utilities.DFUtils.CSVDataFrame + assert(result sameAs expectedResult) + + } + +} diff --git a/src/test/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMainSuite.scala b/src/test/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMainSuite.scala index a848c3c..de8132f 100644 --- a/src/test/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMainSuite.scala +++ b/src/test/scala/fr/polytechnique/cmap/cnam/statistics/StatisticsMainSuite.scala @@ -1,173 +1,9 @@ package fr.polytechnique.cmap.cnam.statistics -import java.sql.Date -import org.apache.spark.sql.types._ import fr.polytechnique.cmap.cnam.{SharedContext, utilities} class StatisticsMainSuite extends SharedContext{ - "changeColumnNameDelimiter" should "change the column name delimiters from dot to underscore " in { - - // Given - val sqlCtx = sqlContext - import sqlCtx.implicits._ - - val inputDf = Seq( - (1, 1.0, "val1", "val2") - ).toDF("key1", "noDelimiter", "key.delimiter1", "key.delimiter2") - - val expectedResult = Seq( - (1, 1.0, "val1", "val2") - ).toDF("key1", "noDelimiter", "key__delimiter1", "key__delimiter2") - - // When - import StatisticsMain.OldFlatHelper - val result = inputDf.changeColumnNameDelimiter - - // Then - import utilities.DFUtils.CSVDataFrame - assert(result sameAs expectedResult) - } - - "changeSchema" should "change the column types to the passed format" in { - - // Given - val sqlCtx = sqlContext - import sqlCtx.implicits._ - - val inputDf = Seq( - ("1", "1", "1", "2006-2-20") - ).toDF("BEN_CTA_TYP", "CPL_REM_BSE", "ER_PHA_F__PHA_PRS_C13", "FLX_TRT_DTD") - - val inputSchema: Map[String, List[(String, String)]] = Map( - "ER_PRS_F" -> List(("BEN_CTA_TYP", "Integer"), ("CPL_REM_BSE", "Double"), ("FLX_TRT_DTD", "Date")), - "ER_PHA_F" -> List(("PHA_PRS_C13", "Long")) - ) - val expectedResult = Seq( - (1, 1.0, 1L, Date.valueOf("2006-02-20")) - ).toDF(inputDf.columns: _*) - - val mainTableName = "ER_PRS_F" - val dateFormat = "yyyy-MM-dd" - - // When - import StatisticsMain.OldFlatHelper - val result = inputDf.changeSchema(inputSchema, mainTableName, dateFormat) - - // Then - expectedResult.show - expectedResult.printSchema - result.show - result.printSchema - - import utilities.DFUtils.CSVDataFrame - assert(inputDf.schema != result.schema) - assert(result sameAs expectedResult) - } - - it should "consider default date format as dd/MM/yyyy when it is not specified explicitly" in { - - // Given - val sqlCtx = sqlContext - import sqlCtx.implicits._ - - val inputColumn = "FLX_TRT_DTD" - val inputDf = Seq("20/02/2006").toDF(inputColumn) - - import org.apache.spark.sql.functions._ - val dateFormat = "dd/MM/yyyy" - val expectedResultColumn = to_date( - unix_timestamp(inputDf(inputColumn), dateFormat).cast(TimestampType) - ).as(inputColumn) - - val expectedResult = inputDf.select(expectedResultColumn) - val schemaMap = Map("ER_PRS_F" -> List(("FLX_TRT_DTD", "Date"))) - val mainTableName = "ER_PRS_F" - - // When - import StatisticsMain.OldFlatHelper - val result = inputDf.changeSchema(schemaMap, mainTableName) - - // Then - inputDf.printSchema - inputDf.show - expectedResult.show - result.show - result.printSchema - - import utilities.DFUtils.CSVDataFrame - assert(result sameAs expectedResult) - assert(inputDf.schema != result.schema) - } - - "annotateJoiningTablesColumns" should "prefix table name to the column names of the joining " + - "tables" in { - - // Given - val sqlCtx = sqlContext - import sqlCtx.implicits._ - - val testInput: List[(String, List[(String,String)])] = List( - "ER_PRS_F" -> List(("Col1", "Type1"), ("Col2", "Type2")), - "ER_PHA_F" -> List(("Col1", "Type1"), ("Col2", "Type2")), - "Random" -> List(("Col1", "Type1"), ("Col2", "Type2")) - ) - val expectedResult: List[Map[String, String]] = List( - Map("Col1" -> "Type1", "Col2" -> "Type2"), - Map("ER_PHA_F__Col1" -> "Type1", "ER_PHA_F__Col2" -> "Type2"), - Map("Random__Col1" -> "Type1", "Random__Col2" -> "Type2") - ) - val mainTableName = "ER_PRS_F" - val sampleDf = Seq("dummyValue").toDF("Col_1") - - // When - import StatisticsMain.OldFlatHelper - val testResult = testInput.map(testInput => sampleDf.annotateJoiningTablesColumns(testInput, mainTableName)) - - // Then - testResult foreach println - assert(testResult === expectedResult) - } - - "prefixColumnName" should "concatenate two given string with __ (double underscores)" in { - - // Given - val sqlCtx = sqlContext - import sqlCtx.implicits._ - - val tableName = "ER_PHA_F" - val columnName = "FLX_TRT_DTD" - val expectedResult = tableName + "__" + columnName - val sampleDf = Seq("dummyValue").toDF("Col_1") - - // When - import StatisticsMain.OldFlatHelper - val result = sampleDf.prefixColName(tableName, columnName) - - // Then - assert(result == expectedResult) - } - - "writeStatistics" should "compute statistics on the input DF and write it in a given path" in { - - // Given - val inputDfPath = "src/test/resources/statistics/flat_table/input/newMCO" - val expectedResultPath = "src/test/resources/statistics/flat_table/expected/newMCOStat" - val resultPath = "target/test/output/statistics/newMCO" - val inputDf = sqlContext.read.option("mergeSchema", "true").parquet(inputDfPath).drop("year") - val expectedResult = sqlContext.read.parquet(expectedResultPath) - - // When - import StatisticsMain.OldFlatHelper - inputDf.writeStatistics(resultPath) - val result = sqlContext.read.parquet(resultPath) - - // Then - import utilities.DFUtils.CSVDataFrame - assert(result sameAs expectedResult) - - } - "run" should "run the overall pipeline correctly without any error" in { // Given