Skip to content

Commit

Permalink
Merge pull request #4 from X-DataInitiative/CNAM-181-stat-flatTable
Browse files Browse the repository at this point in the history
CNAM-181 Initial Working Version
  • Loading branch information
sathiyapk committed May 17, 2017
2 parents 25d811c + c15b842 commit c34039c
Show file tree
Hide file tree
Showing 16 changed files with 632 additions and 0 deletions.
31 changes: 31 additions & 0 deletions src/main/resources/statistics/cmap.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
old_flat = [
{
name = "DCIR"
main_table = "ER_PRS_F"
date_format = "dd/MM/yyyy"
input_path = "/shared/Observapur/old_flattening/joins/DCIR"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/old_flat/DCIR"
}
{
name = "MCO"
main_table = "MCO_C"
date_format = "ddMMMyyyy"
input_path = "/shared/Observapur/old_flattening/joins/MCO"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/old_flat/MCO"
}
]

new_flat = [
{
name = "DCIR"
main_table = "ER_PRS_F"
input_path = "/shared/Observapur/staging/Flattening/flat_table/DCIR"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/new_flat/DCIR"
}
{
name = "MCO"
main_table = "MCO_C"
input_path = "/shared/Observapur/staging/Flattening/flat_table/MCO"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/new_flat/MCO"
}
]
7 changes: 7 additions & 0 deletions src/main/resources/statistics/main.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
cmap = {
include "cmap.conf"
}

test = {
include "test.conf"
}
8 changes: 8 additions & 0 deletions src/main/resources/statistics/test.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
new_flat = [
{
name = "MCO"
main_table = "MCO_C"
input_path = "src/test/resources/statistics/flat_table/input/newMCO"
output_stat_path = "target/test/output/statistics/newMCO"
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package fr.polytechnique.cmap.cnam.statistics

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{NumericType, StructField, StructType}

object CustomStatistics {

implicit class Statistics(val df: DataFrame) {

def customDescribe(inputColumns: Seq[String], distinctOnly: Boolean = false): DataFrame = {

def computeAvailableAgg(
schema: StructType,
colName: String): DataFrame = df.schema(colName) match {

case StructField(numericColumn: String, _: NumericType, _, _) =>
df.select(numericColumn)
.agg(
min(numericColumn).cast("string").as("Min"),
max(numericColumn).cast("string").as("Max"),
count(numericColumn).cast("long").as("Count"),
countDistinct(numericColumn).cast("long").as("CountDistinct"),
sum(numericColumn).cast("double").as("Sum"),
sumDistinct(numericColumn).cast("double").as("SumDistinct"),
avg(numericColumn).cast("double").as("Avg")
).withColumn("ColName", lit(numericColumn))

case _ =>
df.select(colName)
.agg(
min(colName).cast("string").as("Min"),
max(colName).cast("string").as("Max"),
count(colName).cast("long").as("Count"),
countDistinct(colName).cast("long").as("CountDistinct")
).withColumn("Sum", lit(null).cast("double"))
.withColumn("SumDistinct", lit(null).cast("double"))
.withColumn("Avg", lit(null).cast("double"))
.withColumn("ColName", lit(colName))
}

val outputDF: DataFrame = inputColumns
.map(computeAvailableAgg(df.schema, _))
.reduce(_.union(_))

if (distinctOnly) {
outputDF
.drop("Count", "Sum", "Avg")
} else {
outputDF
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package fr.polytechnique.cmap.cnam.statistics

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col
import fr.polytechnique.cmap.cnam.statistics
import fr.polytechnique.cmap.cnam.utilities.DFUtils

object OldFlatHelper {

implicit class ImplicitDF(data: DataFrame) {

final val OldDelimiter: String = "\\."
final val NewDelimiter: String = "__"

def changeColumnNameDelimiter: DataFrame = {
val renamedColumns = data.columns.map(columnName => {
val splittedColName = columnName.split(OldDelimiter)
if (splittedColName.size == 2) {
col("`" + columnName + "`").as(splittedColName(0) + NewDelimiter + splittedColName(1))
} else {
col(columnName)
}
})

data.select(renamedColumns: _*)
}

def changeSchema(
schema: List[TableSchema],
mainTableName: String,
dateFormat: String = "dd/MM/yyyy"): DataFrame = {

val unknownColumnNameType = Map("HOS_NNE_MAM" -> "String")

val flatSchema: Map[String, String] = schema.map(tableSchema =>
annotateJoiningTablesColumns(tableSchema, mainTableName)
).reduce(_ ++ _) ++ unknownColumnNameType

DFUtils.applySchema(data, flatSchema, dateFormat)
}

def annotateJoiningTablesColumns(
tableSchema: TableSchema,
mainTableName: String): Map[String, String] = {

val tableName: String = tableSchema.tableName
val columnTypeMap: Map[String, String] = tableSchema.columnTypes

tableName match {
case `mainTableName` => columnTypeMap
case _ => columnTypeMap.map {
case (colName, colType) => (prefixColName(tableName, colName), colType)
}
}
}

def prefixColName(tableName: String, columnName: String): String = {
tableName + NewDelimiter + columnName
}

import statistics.CustomStatistics._

def computeStatistics: DataFrame = data.customDescribe(data.columns)

def writeStatistics(outputPath: String): Unit = {
data
.computeStatistics
.write
.parquet(outputPath)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package fr.polytechnique.cmap.cnam.statistics

import scala.collection.JavaConverters._
import org.apache.spark.sql.SparkSession
import com.typesafe.config.{Config, ConfigFactory}

object StatisticsConfig {

private lazy val conf: Config = {
val sqlContext = SparkSession.builder().getOrCreate().sqlContext
val configPath: String = sqlContext.getConf("conf", "")
val environment: String = sqlContext.getConf("env", "test")

val defaultConfig = ConfigFactory.parseResources("statistics/main.conf").resolve().getConfig(environment)
val newConfig = ConfigFactory.parseFile(new java.io.File(configPath)).resolve()

newConfig.withFallback(defaultConfig).resolve()
}

val oldFlatConfig: List[Config] = {
if(conf.hasPath("old_flat")) {
conf.getConfigList("old_flat").asScala.toList
} else {
List[Config]()
}
}

val newFlatConfig: List[Config] = {
if(conf.hasPath("new_flat")) {
conf.getConfigList("new_flat").asScala.toList
} else {
List[Config]()
}
}

implicit class StatConfig(statConf: Config) {

val flatTableName: String = statConf.getString("name")
val mainTableName: String = statConf.getString("main_table")
val dateFormat: String = {
if(statConf.hasPath("date_format")) {
statConf.getString("date_format")
} else {
"dd/MM/yyyy"
}
}

val inputPath: String = statConf.getString("input_path")
val statOutputPath: String = statConf.getString("output_stat_path")

def prettyPrint: String = {
s"flatTableName -> $flatTableName \n" +
s"mainTableName -> $mainTableName \n" +
s"dateFormat -> $dateFormat \n" +
s"inputPath -> $inputPath \n" +
s"statOutputPath -> $statOutputPath"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package fr.polytechnique.cmap.cnam.statistics

import org.apache.spark.sql.{Dataset, SQLContext}
import fr.polytechnique.cmap.cnam.flattening.FlatteningConfig
import fr.polytechnique.cmap.cnam.{Main, utilities}

case class TableSchema(tableName: String, columnTypes: Map[String, String])

object StatisticsMain extends Main {

override def appName = "Statistics"

def run(sqlContext: SQLContext, argsMap: Map[String, String]): Option[Dataset[_]] = {

argsMap.get("conf").foreach(sqlContext.setConf("conf", _))
argsMap.get("env").foreach(sqlContext.setConf("env", _))

import utilities.DFUtils._
val tablesSchema: List[TableSchema] = FlatteningConfig.columnTypes.map(x =>
TableSchema(x._1, x._2.toMap)).toList

import OldFlatHelper.ImplicitDF
import StatisticsConfig.StatConfig

StatisticsConfig.oldFlatConfig.foreach { conf =>
logger.info(s"Computing Statistics on the old Flat: ${conf.flatTableName}")
println(conf.prettyPrint)

readParquet(sqlContext, conf.inputPath)
.drop("key")
.changeColumnNameDelimiter
.changeSchema(tablesSchema, conf.mainTableName, conf.dateFormat)
.writeStatistics(conf.statOutputPath)
}

StatisticsConfig.newFlatConfig.foreach { conf =>
logger.info(s"Computing Statistics on the new Flat: ${conf.flatTableName}")
println(conf.prettyPrint)

readParquet(sqlContext, conf.inputPath)
.drop("year")
.writeStatistics(conf.statOutputPath)
}

None
}
}
3 changes: 3 additions & 0 deletions src/test/resources/statistics/custom-statistics/IR_BEN_R.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
BEN_CDI_NIR;BEN_DTE_INS;BEN_DTE_MAJ;BEN_NAI_ANN;BEN_NAI_MOI;BEN_RES_COM;BEN_RES_DPT;BEN_RNG_GEM;BEN_SEX_COD;BEN_TOP_CNS;MAX_TRT_DTD;ORG_CLE_NEW;ORG_AFF_BEN;BEN_DCD_AME;BEN_DCD_DTE;NUM_ENQ
00;;01/01/2006;1975;01;004;02A;1;2;1;;CODE1234;CODE1234;000101;;Patient_01
00;;25/01/2006;1959;10;114;075;1;1;1;07/03/2008;CODE1234;CODE1234;200801;25/01/2008;Patient_02
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading

0 comments on commit c34039c

Please sign in to comment.