Skip to content

Commit

Permalink
CNAM-191 Refactored StatisticsConfig and StatisticsMain
Browse files Browse the repository at this point in the history
CNAM-191 Fixes and tests added

CNAM-191 Small tweaks

CNAM-191 Fixed tests

CNAM-191 fixes

CNAM-191 added tests
  • Loading branch information
danielpes committed May 31, 2017
1 parent a1bcdb0 commit 7ff731c
Show file tree
Hide file tree
Showing 20 changed files with 411 additions and 217 deletions.
95 changes: 44 additions & 51 deletions src/main/resources/statistics/cmap.conf
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
compare_with_old = true
describe_old = true

# old_flat tables are only used when comparing flat data
# old_flat tables are only used when describing old flat data
old_flat = [
{
name = "DCIR"
table_name = "DCIR"
main_table = "ER_PRS_F"
date_format = "dd/MM/yyyy"
input_path = "/shared/Observapur/old_flattening/joins/DCIR"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/DCIR/old_flat"
}
{
name = "MCO"
table_name = "MCO"
main_table = "MCO_C"
date_format = "ddMMMyyyy"
input_path = "/shared/Observapur/old_flattening/joins/MCO"
Expand All @@ -20,58 +20,51 @@ old_flat = [

main_flat = [
{
name = "DCIR"
table_name = "DCIR"
main_table = "ER_PRS_F"
input_path = "/shared/Observapur/staging/Flattening/flat_table/DCIR"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/DCIR/new_flat"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/DCIR"
single_tables = [
{
table_name = "ER_CAM_F"
input_path = "/shared/Observapur/staging/Flattening/single_table/ER_CAM_F"
}
{
table_name = "ER_PHA_F"
input_path = "/shared/Observapur/staging/Flattening/single_table/ER_PHA_F"
}
{
table_name = "ER_PRS_F"
input_path = "/shared/Observapur/staging/Flattening/single_table/ER_PRS_F"
}
]
}
{
name = "MCO"
table_name = "MCO"
main_table = "MCO_C"
input_path = "/shared/Observapur/staging/Flattening/flat_table/MCO"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/MCO/new_flat"
}
]

single_tables = [
{
name = "ER_CAM_F"
input_path = "/shared/Observapur/staging/Flattening/single_table/ER_CAM_F"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/DCIR/single_tables/table=ER_CAM_F"
}
{
name = "ER_PHA_F"
input_path = "/shared/Observapur/staging/Flattening/single_table/ER_PHA_F"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/DCIR/single_tables/table=ER_PHA_F"
}
{
name = "ER_PRS_F"
input_path = "/shared/Observapur/staging/Flattening/single_table/ER_PRS_F"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/DCIR/single_tables/table=ER_PRS_F"
}
{
name = "MCO_A"
input_path = "/shared/Observapur/staging/Flattening/single_table/MCO_A"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/MCO/single_tables/table=MCO_A"
}
{
name = "MCO_B"
input_path = "/shared/Observapur/staging/Flattening/single_table/MCO_B"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/MCO/single_tables/table=MCO_B"
}
{
name = "MCO_C"
input_path = "/shared/Observapur/staging/Flattening/single_table/MCO_C"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/MCO/single_tables/table=MCO_C"
}
{
name = "MCO_D"
input_path = "/shared/Observapur/staging/Flattening/single_table/MCO_D"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/MCO/single_tables/table=MCO_D"
}
{
name = "MCO_UM"
input_path = "/shared/Observapur/staging/Flattening/single_table/MCO_UM"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/MCO/single_tables/table=MCO_UM"
output_stat_path = "/shared/Observapur/staging/Flattening/statistics/MCO"
single_tables = [
{
table_name = "MCO_A"
input_path = "/shared/Observapur/staging/Flattening/single_table/MCO_A"
}
{
table_name = "MCO_B"
input_path = "/shared/Observapur/staging/Flattening/single_table/MCO_B"
}
{
table_name = "MCO_C"
input_path = "/shared/Observapur/staging/Flattening/single_table/MCO_C"
}
{
table_name = "MCO_D"
input_path = "/shared/Observapur/staging/Flattening/single_table/MCO_D"
}
{
table_name = "MCO_UM"
input_path = "/shared/Observapur/staging/Flattening/single_table/MCO_UM"
}
]
}
]
8 changes: 4 additions & 4 deletions src/main/resources/statistics/test.conf
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
compare_with_old = true
describe_old = true

new_flat = [
main_flat = [
{
name = "MCO"
table_name = "MCO"
main_table = "MCO_C"
input_path = "src/test/resources/statistics/flat_table/input/newMCO"
output_stat_path = "target/test/output/statistics/newMCO"
output_stat_path = "target/test/output/statistics/MCO"
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@ object FlatteningConfig {
newConfig.withFallback(defaultConfig).resolve()
}

val schemaFilePath: List[String] = conf.getStringList("schema_file_path").asScala.toList
lazy val schemaFilePath: List[String] = conf.getStringList("schema_file_path").asScala.toList

val outputBasePath: String = conf.getString("single_table_path")
lazy val outputBasePath: String = conf.getString("single_table_path")

val tablesConfigList: List[Config] = conf.getConfigList("tables_config").asScala.toList
lazy val tablesConfigList: List[Config] = conf.getConfigList("tables_config").asScala.toList
.map(_.getConfigList("tables").asScala.toList)
.reduce(_ ::: _)

val partitionsList: List[ConfigPartition] = getPartitionList(tablesConfigList)
val joinTablesConfig: List[Config] = conf.getConfigList("join").asScala.toList
lazy val partitionsList: List[ConfigPartition] = getPartitionList(tablesConfigList)
lazy val joinTablesConfig: List[Config] = conf.getConfigList("join").asScala.toList

private val csvSchema = CSVSchemaReader.readSchemaFiles(schemaFilePath)
val columnTypes: Map[String, List[(String, String)]] = CSVSchemaReader.readColumnsType(csvSchema)
private lazy val csvSchema = CSVSchemaReader.readSchemaFiles(schemaFilePath)
lazy val columnTypes: Map[String, List[(String, String)]] = CSVSchemaReader.readColumnsType(csvSchema)

implicit class SingleTableConfig(config: Config) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ object CustomStatistics {
max(numericColumn).cast("string").as("Max"),
count(numericColumn).cast("long").as("Count"),
countDistinct(numericColumn).cast("long").as("CountDistinct"),
sum(numericColumn).cast("double").as("Sum"),
sumDistinct(numericColumn).cast("double").as("SumDistinct"),
avg(numericColumn).cast("double").as("Avg")
round(sum(numericColumn).cast("double"), 4).as("Sum"),
round(sumDistinct(numericColumn).cast("double"), 4).as("SumDistinct"),
round(avg(numericColumn).cast("double"), 4).as("Avg")
).withColumn("ColName", lit(numericColumn))

// For other types, compute only min, max and counts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import org.apache.spark.sql.functions.col
import fr.polytechnique.cmap.cnam.statistics
import fr.polytechnique.cmap.cnam.utilities.DFUtils

object FlatTableHelper {
object DataFrameHelper {

implicit class ImplicitDF(data: DataFrame) {

Expand Down Expand Up @@ -70,6 +70,12 @@ object FlatTableHelper {
.write
.parquet(outputPath)
}

def prefixColumnNames(prefix: String, separator: String = "__"): DataFrame = {
data.columns.foldLeft(data) {
(curDF, colName) => curDF.withColumnRenamed(colName, prefix + separator + colName)
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package fr.polytechnique.cmap.cnam.statistics

import scala.collection.JavaConverters._
import com.typesafe.config.Config

case class FlatTableConfig(
tableName: String,
centralTable: String,
dateFormat: String,
inputPath: String,
outputStatPath: String,
singleTables: List[SingleTableConfig]) {

override def toString: String = {
s"tableName -> $tableName \n" +
s"centralTable -> $centralTable \n" +
s"dateFormat -> $dateFormat \n" +
s"inputPath -> $inputPath \n" +
s"outputStatPath -> $outputStatPath \n" +
s"singleTableCount -> ${singleTables.size}"
}
}

object FlatTableConfig {

def fromConfig(c: Config): FlatTableConfig = {

val singleTables = if (c.hasPath("single_tables"))
c.getConfigList("single_tables").asScala.toList.map(SingleTableConfig.fromConfig)
else
List[SingleTableConfig]()

FlatTableConfig(
tableName = c.getString("table_name"),
centralTable = c.getString("main_table"),
dateFormat = if (c.hasPath("date_format")) c.getString("date_format") else "dd/MM/yyyy",
inputPath = c.getString("input_path"),
outputStatPath = c.getString("output_stat_path"),
singleTables = singleTables
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package fr.polytechnique.cmap.cnam.statistics

import com.typesafe.config.Config

case class SingleTableConfig(
tableName: String,
dateFormat: String,
inputPath: String) {

override def toString: String = {
s"tableName -> $tableName \n" +
s"dateFormat -> $dateFormat \n" +
s"inputPath -> $inputPath"
}
}

object SingleTableConfig {

def fromConfig(c: Config): SingleTableConfig = {
SingleTableConfig(
tableName = c.getString("table_name"),
dateFormat = if (c.hasPath("date_format")) c.getString("date_format") else "dd/MM/yyyy",
inputPath = c.getString("input_path")
)
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,53 +17,21 @@ object StatisticsConfig {
newConfig.withFallback(defaultConfig).resolve()
}

val compareWithOldFlattening: Boolean = conf.getBoolean("compare_with_old")
val describeOldFlatTable: Boolean = conf.getBoolean("describe_old")

val oldFlatConfig: List[Config] = {
val oldFlatConfig: List[FlatTableConfig] = {
if(conf.hasPath("old_flat")) {
conf.getConfigList("old_flat").asScala.toList
conf.getConfigList("old_flat").asScala.toList.map(FlatTableConfig.fromConfig)
} else {
List[Config]()
List[FlatTableConfig]()
}
}

val mainFlatConfig: List[Config] = {
if(conf.hasPath("new_flat")) {
conf.getConfigList("new_flat").asScala.toList
} else {
List[Config]()
}
}

val singleTablesConfig: List[Config] = {
if(conf.hasPath("single_tables")) {
conf.getConfigList("single_tables").asScala.toList
} else {
List[Config]()
}
}

implicit class StatConfig(statConf: Config) {

val flatTableName: String = statConf.getString("name")
val mainTableName: String = statConf.getString("main_table")
val dateFormat: String = {
if(statConf.hasPath("date_format")) {
statConf.getString("date_format")
} else {
"dd/MM/yyyy"
}
}

val inputPath: String = statConf.getString("input_path")
val statOutputPath: String = statConf.getString("output_stat_path")

def prettyPrint: String = {
s"flatTableName -> $flatTableName \n" +
s"mainTableName -> $mainTableName \n" +
s"dateFormat -> $dateFormat \n" +
s"inputPath -> $inputPath \n" +
s"statOutputPath -> $statOutputPath"
val mainFlatConfig: List[FlatTableConfig] = {
if(conf.hasPath("main_flat")) {
conf.getConfigList("main_flat").asScala.toList.map(FlatTableConfig.fromConfig)
} else {
List[FlatTableConfig]()
}
}
}
Loading

0 comments on commit 7ff731c

Please sign in to comment.