Skip to content

Commit

Permalink
CNAM-191 Added initial logic to compute statistics on single tables
Browse files Browse the repository at this point in the history
CNAM-191 Fixed StatisticsMain
  • Loading branch information
danielpes committed May 31, 2017
1 parent c34039c commit a1bcdb0
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 74 deletions.
56 changes: 51 additions & 5 deletions src/main/resources/statistics/cmap.conf
Original file line number Diff line number Diff line change
@@ -1,31 +1,77 @@
compare_with_old = true

# old_flat tables are only used when comparing flat data
old_flat = [
{
name = "DCIR"
main_table = "ER_PRS_F"
date_format = "dd/MM/yyyy"
input_path = "/shared/Observapur/old_flattening/joins/DCIR"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/old_flat/DCIR"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/DCIR/old_flat"
}
{
name = "MCO"
main_table = "MCO_C"
date_format = "ddMMMyyyy"
input_path = "/shared/Observapur/old_flattening/joins/MCO"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/old_flat/MCO"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/MCO/old_flat"
}
]

new_flat = [
main_flat = [
{
name = "DCIR"
main_table = "ER_PRS_F"
input_path = "/shared/Observapur/staging/Flattening/flat_table/DCIR"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/new_flat/DCIR"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/DCIR/new_flat"
}
{
name = "MCO"
main_table = "MCO_C"
input_path = "/shared/Observapur/staging/Flattening/flat_table/MCO"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/new_flat/MCO"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/MCO/new_flat"
}
]

single_tables = [
{
name = "ER_CAM_F"
input_path = "/shared/Observapur/staging/Flattening/single_table/ER_CAM_F"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/DCIR/single_tables/table=ER_CAM_F"
}
{
name = "ER_PHA_F"
input_path = "/shared/Observapur/staging/Flattening/single_table/ER_PHA_F"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/DCIR/single_tables/table=ER_PHA_F"
}
{
name = "ER_PRS_F"
input_path = "/shared/Observapur/staging/Flattening/single_table/ER_PRS_F"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/DCIR/single_tables/table=ER_PRS_F"
}
{
name = "MCO_A"
input_path = "/shared/Observapur/staging/Flattening/single_table/MCO_A"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/MCO/single_tables/table=MCO_A"
}
{
name = "MCO_B"
input_path = "/shared/Observapur/staging/Flattening/single_table/MCO_B"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/MCO/single_tables/table=MCO_B"
}
{
name = "MCO_C"
input_path = "/shared/Observapur/staging/Flattening/single_table/MCO_C"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/MCO/single_tables/table=MCO_C"
}
{
name = "MCO_D"
input_path = "/shared/Observapur/staging/Flattening/single_table/MCO_D"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/MCO/single_tables/table=MCO_D"
}
{
name = "MCO_UM"
input_path = "/shared/Observapur/staging/Flattening/single_table/MCO_UM"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/MCO/single_tables/table=MCO_UM"
}
]
2 changes: 2 additions & 0 deletions src/main/resources/statistics/test.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
compare_with_old = true

new_flat = [
{
name = "MCO"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,51 @@ package fr.polytechnique.cmap.cnam.statistics

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{NumericType, StructField, StructType}
import org.apache.spark.sql.types.{NumericType, StructField}

object CustomStatistics {

implicit class Statistics(val df: DataFrame) {

def customDescribe(inputColumns: Seq[String], distinctOnly: Boolean = false): DataFrame = {
def customDescribe(
colNames: Seq[String] = df.columns,
distinctOnly: Boolean = false): DataFrame = {

def computeAvailableAgg(
schema: StructType,
colName: String): DataFrame = df.schema(colName) match {
def columnStatistics(colName: String): DataFrame = df.schema(colName) match {

// For numeric type, compute all stats
case StructField(numericColumn: String, _: NumericType, _, _) =>
df.select(numericColumn)
.agg(
min(numericColumn).cast("string").as("Min"),
max(numericColumn).cast("string").as("Max"),
count(numericColumn).cast("long").as("Count"),
countDistinct(numericColumn).cast("long").as("CountDistinct"),
sum(numericColumn).cast("double").as("Sum"),
sumDistinct(numericColumn).cast("double").as("SumDistinct"),
avg(numericColumn).cast("double").as("Avg")
).withColumn("ColName", lit(numericColumn))

.agg(
min(numericColumn).cast("string").as("Min"),
max(numericColumn).cast("string").as("Max"),
count(numericColumn).cast("long").as("Count"),
countDistinct(numericColumn).cast("long").as("CountDistinct"),
sum(numericColumn).cast("double").as("Sum"),
sumDistinct(numericColumn).cast("double").as("SumDistinct"),
avg(numericColumn).cast("double").as("Avg")
).withColumn("ColName", lit(numericColumn))

// For other types, compute only min, max and counts
case _ =>
df.select(colName)
.agg(
min(colName).cast("string").as("Min"),
max(colName).cast("string").as("Max"),
count(colName).cast("long").as("Count"),
countDistinct(colName).cast("long").as("CountDistinct")
).withColumn("Sum", lit(null).cast("double"))
.withColumn("SumDistinct", lit(null).cast("double"))
.withColumn("Avg", lit(null).cast("double"))
.withColumn("ColName", lit(colName))
.agg(
min(colName).cast("string").as("Min"),
max(colName).cast("string").as("Max"),
count(colName).cast("long").as("Count"),
countDistinct(colName).cast("long").as("CountDistinct")
).withColumn("Sum", lit(null).cast("double"))
.withColumn("SumDistinct", lit(null).cast("double"))
.withColumn("Avg", lit(null).cast("double"))
.withColumn("ColName", lit(colName))
}

val outputDF: DataFrame = inputColumns
.map(computeAvailableAgg(df.schema, _))
val outputDF: DataFrame = colNames
.map(columnStatistics)
.reduce(_.union(_))

if (distinctOnly) {
outputDF
.drop("Count", "Sum", "Avg")
} else {
outputDF
}
if (distinctOnly) outputDF.drop("Count", "Sum", "Avg")
else outputDF
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import org.apache.spark.sql.functions.col
import fr.polytechnique.cmap.cnam.statistics
import fr.polytechnique.cmap.cnam.utilities.DFUtils

object OldFlatHelper {
object FlatTableHelper {

implicit class ImplicitDF(data: DataFrame) {

Expand Down Expand Up @@ -60,13 +60,15 @@ object OldFlatHelper {

import statistics.CustomStatistics._

def computeStatistics: DataFrame = data.customDescribe(data.columns)
def computeStatistics(distinctOnly: Boolean): DataFrame = {
data.customDescribe(data.columns, distinctOnly)
}

def writeStatistics(outputPath: String): Unit = {
def writeStatistics(outputPath: String, distinctOnly: Boolean = false): Unit = {
data
.computeStatistics
.write
.parquet(outputPath)
.computeStatistics(distinctOnly)
.write
.parquet(outputPath)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package fr.polytechnique.cmap.cnam.statistics

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import fr.polytechnique.cmap.cnam.statistics.CustomStatistics._

object SingleTablesStatistics {

// Todo: This method works only when there is no unseen keys (key colNames) in the individual DFs
// that don't exists in the DF that contains the key (eq, PRS for DCIR, C for PMSI MCO ).
// We should waive this restriction in the future versions.

def describeSingleTables(singleTables: List[DataFrame]): DataFrame = {

// Selects all the data of a column from all tables containing this column
def selectColumnFromTables(colName: String): DataFrame = {
singleTables
.collect {
case table if table.columns.contains(colName) => table.select(col(colName))
}
.reduce(_.union(_))
}

val columnsToDescribe: Set[String] = singleTables.map(_.columns).reduce(_ ++ _).toSet

val describedColumns: List[DataFrame] = columnsToDescribe.map {
colName => selectColumnFromTables(colName).customDescribe(distinctOnly = true)
}.toList

describedColumns.reduce(_.union(_))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ object StatisticsConfig {
newConfig.withFallback(defaultConfig).resolve()
}

val compareWithOldFlattening: Boolean = conf.getBoolean("compare_with_old")

val oldFlatConfig: List[Config] = {
if(conf.hasPath("old_flat")) {
conf.getConfigList("old_flat").asScala.toList
Expand All @@ -25,14 +27,22 @@ object StatisticsConfig {
}
}

val newFlatConfig: List[Config] = {
val mainFlatConfig: List[Config] = {
if(conf.hasPath("new_flat")) {
conf.getConfigList("new_flat").asScala.toList
} else {
List[Config]()
}
}

val singleTablesConfig: List[Config] = {
if(conf.hasPath("single_tables")) {
conf.getConfigList("single_tables").asScala.toList
} else {
List[Config]()
}
}

implicit class StatConfig(statConf: Config) {

val flatTableName: String = statConf.getString("name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package fr.polytechnique.cmap.cnam.statistics

import org.apache.spark.sql.{Dataset, SQLContext}
import fr.polytechnique.cmap.cnam.flattening.FlatteningConfig
import fr.polytechnique.cmap.cnam.utilities.DFUtils.readParquet
import fr.polytechnique.cmap.cnam.{Main, utilities}

case class TableSchema(tableName: String, columnTypes: Map[String, String])
Expand All @@ -12,36 +13,62 @@ object StatisticsMain extends Main {

def run(sqlContext: SQLContext, argsMap: Map[String, String]): Option[Dataset[_]] = {

argsMap.get("conf").foreach(sqlContext.setConf("conf", _))
argsMap.get("env").foreach(sqlContext.setConf("env", _))
def describeOldFlatTable(): Unit = {

import utilities.DFUtils._
val tablesSchema: List[TableSchema] = FlatteningConfig.columnTypes.map(x =>
TableSchema(x._1, x._2.toMap)).toList
import FlatTableHelper.ImplicitDF
import StatisticsConfig.StatConfig

import OldFlatHelper.ImplicitDF
import StatisticsConfig.StatConfig
val tablesSchema: List[TableSchema] = FlatteningConfig.columnTypes.map(x =>
TableSchema(x._1, x._2.toMap)).toList

StatisticsConfig.oldFlatConfig.foreach { conf =>
logger.info(s"Computing Statistics on the old Flat: ${conf.flatTableName}")
println(conf.prettyPrint)
StatisticsConfig.oldFlatConfig.foreach { conf =>
logger.info(s"Computing Statistics on the old Flat: ${conf.flatTableName}")
println(conf.prettyPrint)

readParquet(sqlContext, conf.inputPath)
.drop("key")
.changeColumnNameDelimiter
.changeSchema(tablesSchema, conf.mainTableName, conf.dateFormat)
.writeStatistics(conf.statOutputPath)
readParquet(sqlContext, conf.inputPath)
.drop("key")
.changeColumnNameDelimiter
.changeSchema(tablesSchema, conf.mainTableName, conf.dateFormat)
}
}

StatisticsConfig.newFlatConfig.foreach { conf =>
logger.info(s"Computing Statistics on the new Flat: ${conf.flatTableName}")
println(conf.prettyPrint)
def describeSingleTables(): Unit = {

import StatisticsConfig.StatConfig

StatisticsConfig.singleTablesConfig.foreach { conf =>
logger.info(s"Computing Statistics on the single table: ${conf.flatTableName}")
println(conf.prettyPrint)

readParquet(sqlContext, conf.inputPath)
.drop("year")
.writeStatistics(conf.statOutputPath)
import CustomStatistics.Statistics
readParquet(sqlContext, conf.inputPath)
.customDescribe(distinctOnly = true)
.write.parquet(conf.statOutputPath)
}
}

def describeMainFlatTable(): Unit = {

import FlatTableHelper.ImplicitDF
import StatisticsConfig.StatConfig

StatisticsConfig.mainFlatConfig.foreach { conf =>
logger.info(s"Computing Statistics on the main Flat: ${conf.flatTableName}")
println(conf.prettyPrint)

readParquet(sqlContext, conf.inputPath)
.drop("year")
.writeStatistics(conf.statOutputPath)
}
}

argsMap.get("conf").foreach(sqlContext.setConf("conf", _))
argsMap.get("env").foreach(sqlContext.setConf("env", _))

if(StatisticsConfig.compareWithOldFlattening) describeOldFlatTable()
describeSingleTables()
describeMainFlatTable()

None
}
}
Loading

0 comments on commit a1bcdb0

Please sign in to comment.