From 6de4e2efbda3bdc68efb543d1dd301b4cbf4cc7e Mon Sep 17 00:00:00 2001 From: spenes Date: Tue, 24 Sep 2024 01:41:31 +0300 Subject: [PATCH] Transformer Stream: don't use hard-coded window timestamps in the tests Some of the tests are failing since windows don't have expected timestamps. In order to solve this problem, this commit makes necessary changes to read window timestamps from shredded message instead of using hard-coded values. --- .../1/output/good/tsv/completion.json | 6 +-- .../1/output/good/widerow/completion.json | 6 +-- .../resources/processing-spec/3/output/bad | 2 +- .../processing-spec/3/output/completion.json | 6 +-- .../4/output/good/parquet/completion.json | 6 +-- .../5/output/good/parquet/completion.json | 6 +-- .../6/output/good/parquet/completion.json | 6 +-- .../7/output/good/parquet/completion.json | 6 +-- .../processing-spec/8/output/completion.json | 6 +-- .../processing/BaseProcessingSpec.scala | 28 +++++++++- .../common/processing/QueueBadSinkSpec.scala | 10 ++-- .../processing/ShredTsvProcessingSpec.scala | 48 ++++++----------- .../common/processing/ShutdownSpec.scala | 5 +- .../WiderowJsonProcessingSpec.scala | 14 +++-- .../WiderowParquetProcessingSpec.scala | 54 +++++++++---------- .../transformer/batch/ShredJobSpec.scala | 10 ---- project/BuildSettings.scala | 1 + 17 files changed, 109 insertions(+), 111 deletions(-) diff --git a/modules/common-transformer-stream/src/test/resources/processing-spec/1/output/good/tsv/completion.json b/modules/common-transformer-stream/src/test/resources/processing-spec/1/output/good/tsv/completion.json index 99b711477..5e64d093f 100644 --- a/modules/common-transformer-stream/src/test/resources/processing-spec/1/output/good/tsv/completion.json +++ b/modules/common-transformer-stream/src/test/resources/processing-spec/1/output/good/tsv/completion.json @@ -1,7 +1,7 @@ { "schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1", "data": { - "base": "output_path_placeholder/run=1970-01-01-10-30-00/", + "base": "output_path_placeholder", "typesInfo": { "transformation": "SHREDDED", "types": [ @@ -113,8 +113,8 @@ ] }, "timestamps": { - "jobStarted": "1970-01-01T10:30:00Z", - "jobCompleted": "1970-01-01T10:30:00Z", + "jobStarted": "job_started_placeholder", + "jobCompleted": "job_completed_placeholder", "min": "2021-10-13T20:21:47.595072674Z", "max": "2021-10-15T00:51:57.521746512Z" }, diff --git a/modules/common-transformer-stream/src/test/resources/processing-spec/1/output/good/widerow/completion.json b/modules/common-transformer-stream/src/test/resources/processing-spec/1/output/good/widerow/completion.json index 7a79aecc4..5fd20cfcd 100644 --- a/modules/common-transformer-stream/src/test/resources/processing-spec/1/output/good/widerow/completion.json +++ b/modules/common-transformer-stream/src/test/resources/processing-spec/1/output/good/widerow/completion.json @@ -1,7 +1,7 @@ { "schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1", "data": { - "base": "output_path_placeholder/run=1970-01-01-10-30-00/", + "base": "output_path_placeholder", "typesInfo": { "transformation": "WIDEROW", "fileFormat": "JSON", @@ -93,8 +93,8 @@ ] }, "timestamps": { - "jobStarted": "1970-01-01T10:30:00Z", - "jobCompleted": "1970-01-01T10:30:00Z", + "jobStarted": "job_started_placeholder", + "jobCompleted": "job_completed_placeholder", "min": "2021-10-13T20:21:47.595072674Z", "max": "2021-10-15T00:51:57.521746512Z" }, diff --git a/modules/common-transformer-stream/src/test/resources/processing-spec/3/output/bad b/modules/common-transformer-stream/src/test/resources/processing-spec/3/output/bad index bb8903ff9..ceb9d8952 100644 --- a/modules/common-transformer-stream/src/test/resources/processing-spec/3/output/bad +++ b/modules/common-transformer-stream/src/test/resources/processing-spec/3/output/bad @@ -1 +1 @@ -{"schema":"iglu:com.snowplowanalytics.snowplow.badrows/loader_iglu_error/jsonschema/2-0-0","data":{"processor":{"artifact":"snowplow-transformer-kinesis","version":"version_placeholder"},"failure":[{"schemaCriterion":"iglu:org.schema/some_unknown_name/jsonschema/1-*-*","error":{"error":"ResolutionError","lookupHistory":[{"repository":"Iglu Client Embedded","errors":[{"error":"NotFound"}],"attempts":1,"lastAttempt":"1970-01-01T10:30:00Z"}]}}],"payload":{"app_id":"snowplowweb","platform":"web","etl_tstamp":"2014-06-01T14:04:11.639Z","collector_tstamp":"2014-05-29T18:16:35Z","dvce_created_tstamp":"2014-05-29T18:04:11.639Z","event":"page_view","event_id":"2b1b25a4-c0df-4859-8201-cf21492ad61b","txn_id":836413,"name_tracker":"clojure","v_tracker":"js-2.0.0-M2","v_collector":"clj-0.6.0-tom-0.0.4","v_etl":"hadoop-0.5.0-common-0.4.0","user_id":null,"user_ipaddress":"216.207.42.134","user_fingerprint":"3499345421","domain_userid":"3b1d1a375044eede","domain_sessionidx":3,"network_userid":"2bad2a4e-aae4-4bea-8acd-399e7fe0366a","geo_country":"US","geo_region":"CA","geo_city":"South San Francisco","geo_zipcode":null,"geo_latitude":37.654694,"geo_longitude":-122.4077,"geo_region_name":null,"ip_isp":null,"ip_organization":null,"ip_domain":null,"ip_netspeed":null,"page_url":"http://snowplowanalytics.com/blog/2013/02/08/writing-hive-udfs-and-serdes/","page_title":"Writing Hive UDFs - a tutorial","page_referrer":null,"page_urlscheme":"http","page_urlhost":"snowplowanalytics.com","page_urlport":80,"page_urlpath":"/blog/2013/02/08/writing-hive-udfs-and-serdes/","page_urlquery":null,"page_urlfragment":null,"refr_urlscheme":null,"refr_urlhost":null,"refr_urlport":null,"refr_urlpath":null,"refr_urlquery":null,"refr_urlfragment":null,"refr_medium":null,"refr_source":null,"refr_term":null,"mkt_medium":null,"mkt_source":null,"mkt_term":null,"mkt_content":null,"mkt_campaign":null,"contexts":{},"se_category":null,"se_action":null,"se_label":null,"se_property":null,"se_value":null,"unstruct_event":null,"tr_orderid":null,"tr_affiliation":null,"tr_total":null,"tr_tax":null,"tr_shipping":null,"tr_city":null,"tr_state":null,"tr_country":null,"ti_orderid":null,"ti_sku":null,"ti_name":null,"ti_category":null,"ti_price":null,"ti_quantity":null,"pp_xoffset_min":null,"pp_xoffset_max":null,"pp_yoffset_min":null,"pp_yoffset_max":null,"useragent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/537.75.14","br_name":"Safari","br_family":"Safari","br_version":null,"br_type":"Browser","br_renderengine":"WEBKIT","br_lang":"en-us","br_features_pdf":false,"br_features_flash":false,"br_features_java":false,"br_features_director":false,"br_features_quicktime":false,"br_features_realplayer":false,"br_features_windowsmedia":false,"br_features_gears":false,"br_features_silverlight":false,"br_cookies":true,"br_colordepth":"24","br_viewwidth":1440,"br_viewheight":1845,"os_name":"Mac OS","os_family":"Mac OS","os_manufacturer":"Apple Inc.","os_timezone":"America/Los_Angeles","dvce_type":"Computer","dvce_ismobile":false,"dvce_screenwidth":1440,"dvce_screenheight":900,"doc_charset":"UTF-8","doc_width":1440,"doc_height":6015,"tr_currency":null,"tr_total_base":null,"tr_tax_base":null,"tr_shipping_base":null,"ti_currency":null,"ti_price_base":null,"base_currency":null,"geo_timezone":null,"mkt_clickid":null,"mkt_network":null,"etl_tags":null,"dvce_sent_tstamp":null,"refr_domain_userid":null,"refr_dvce_tstamp":null,"derived_contexts":{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:org.schema/some_unknown_name/jsonschema/1-0-0","data":{"datePublished":"2014-07-23T00:00:00Z","author":"Jonathan Almeida","inLanguage":"en-US","genre":"blog","breadcrumb":["blog","releases"],"keywords":["snowplow","analytics","java","jvm","tracker"]}}]},"domain_sessionid":null,"derived_tstamp":null,"event_vendor":null,"event_name":null,"event_format":null,"event_version":null,"event_fingerprint":null,"true_tstamp":null}}} +{"schema":"iglu:com.snowplowanalytics.snowplow.badrows/loader_iglu_error/jsonschema/2-0-0","data":{"processor":{"artifact":"snowplow-transformer-kinesis","version":"version_placeholder"},"failure":[{"schemaKey":"iglu:org.schema/some_unknown_name/jsonschema/1-0-0","error":{"error":"ResolutionError","lookupHistory":[{"repository":"Iglu Client Embedded","errors":[{"error":"NotFound"}],"attempts":1,"lastAttempt":""}]}}],"payload":{"app_id":"snowplowweb","platform":"web","etl_tstamp":"2014-06-01T14:04:11.639Z","collector_tstamp":"2014-05-29T18:16:35Z","dvce_created_tstamp":"2014-05-29T18:04:11.639Z","event":"page_view","event_id":"2b1b25a4-c0df-4859-8201-cf21492ad61b","txn_id":836413,"name_tracker":"clojure","v_tracker":"js-2.0.0-M2","v_collector":"clj-0.6.0-tom-0.0.4","v_etl":"hadoop-0.5.0-common-0.4.0","user_id":null,"user_ipaddress":"216.207.42.134","user_fingerprint":"3499345421","domain_userid":"3b1d1a375044eede","domain_sessionidx":3,"network_userid":"2bad2a4e-aae4-4bea-8acd-399e7fe0366a","geo_country":"US","geo_region":"CA","geo_city":"South San Francisco","geo_zipcode":null,"geo_latitude":37.654694,"geo_longitude":-122.4077,"geo_region_name":null,"ip_isp":null,"ip_organization":null,"ip_domain":null,"ip_netspeed":null,"page_url":"http://snowplowanalytics.com/blog/2013/02/08/writing-hive-udfs-and-serdes/","page_title":"Writing Hive UDFs - a tutorial","page_referrer":null,"page_urlscheme":"http","page_urlhost":"snowplowanalytics.com","page_urlport":80,"page_urlpath":"/blog/2013/02/08/writing-hive-udfs-and-serdes/","page_urlquery":null,"page_urlfragment":null,"refr_urlscheme":null,"refr_urlhost":null,"refr_urlport":null,"refr_urlpath":null,"refr_urlquery":null,"refr_urlfragment":null,"refr_medium":null,"refr_source":null,"refr_term":null,"mkt_medium":null,"mkt_source":null,"mkt_term":null,"mkt_content":null,"mkt_campaign":null,"contexts":{},"se_category":null,"se_action":null,"se_label":null,"se_property":null,"se_value":null,"unstruct_event":null,"tr_orderid":null,"tr_affiliation":null,"tr_total":null,"tr_tax":null,"tr_shipping":null,"tr_city":null,"tr_state":null,"tr_country":null,"ti_orderid":null,"ti_sku":null,"ti_name":null,"ti_category":null,"ti_price":null,"ti_quantity":null,"pp_xoffset_min":null,"pp_xoffset_max":null,"pp_yoffset_min":null,"pp_yoffset_max":null,"useragent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/537.75.14","br_name":"Safari","br_family":"Safari","br_version":null,"br_type":"Browser","br_renderengine":"WEBKIT","br_lang":"en-us","br_features_pdf":false,"br_features_flash":false,"br_features_java":false,"br_features_director":false,"br_features_quicktime":false,"br_features_realplayer":false,"br_features_windowsmedia":false,"br_features_gears":false,"br_features_silverlight":false,"br_cookies":true,"br_colordepth":"24","br_viewwidth":1440,"br_viewheight":1845,"os_name":"Mac OS","os_family":"Mac OS","os_manufacturer":"Apple Inc.","os_timezone":"America/Los_Angeles","dvce_type":"Computer","dvce_ismobile":false,"dvce_screenwidth":1440,"dvce_screenheight":900,"doc_charset":"UTF-8","doc_width":1440,"doc_height":6015,"tr_currency":null,"tr_total_base":null,"tr_tax_base":null,"tr_shipping_base":null,"ti_currency":null,"ti_price_base":null,"base_currency":null,"geo_timezone":null,"mkt_clickid":null,"mkt_network":null,"etl_tags":null,"dvce_sent_tstamp":null,"refr_domain_userid":null,"refr_dvce_tstamp":null,"derived_contexts":{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:org.schema/some_unknown_name/jsonschema/1-0-0","data":{"datePublished":"2014-07-23T00:00:00Z","author":"Jonathan Almeida","inLanguage":"en-US","genre":"blog","breadcrumb":["blog","releases"],"keywords":["snowplow","analytics","java","jvm","tracker"]}}]},"domain_sessionid":null,"derived_tstamp":null,"event_vendor":null,"event_name":null,"event_format":null,"event_version":null,"event_fingerprint":null,"true_tstamp":null}}} \ No newline at end of file diff --git a/modules/common-transformer-stream/src/test/resources/processing-spec/3/output/completion.json b/modules/common-transformer-stream/src/test/resources/processing-spec/3/output/completion.json index b9fa0bf91..511c0514b 100644 --- a/modules/common-transformer-stream/src/test/resources/processing-spec/3/output/completion.json +++ b/modules/common-transformer-stream/src/test/resources/processing-spec/3/output/completion.json @@ -1,14 +1,14 @@ { "schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1", "data": { - "base": "output_path_placeholder/run=1970-01-01-10-30-00/", + "base": "output_path_placeholder", "typesInfo": { "transformation": "SHREDDED", "types": [] }, "timestamps": { - "jobStarted": "1970-01-01T10:30:00Z", - "jobCompleted": "1970-01-01T10:30:00Z", + "jobStarted": "job_started_placeholder", + "jobCompleted": "job_completed_placeholder", "min": "2014-05-29T18:16:35Z", "max": "2014-05-29T18:16:35Z" }, diff --git a/modules/common-transformer-stream/src/test/resources/processing-spec/4/output/good/parquet/completion.json b/modules/common-transformer-stream/src/test/resources/processing-spec/4/output/good/parquet/completion.json index c8d0470e8..a579a8d2e 100644 --- a/modules/common-transformer-stream/src/test/resources/processing-spec/4/output/good/parquet/completion.json +++ b/modules/common-transformer-stream/src/test/resources/processing-spec/4/output/good/parquet/completion.json @@ -1,7 +1,7 @@ { "schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1", "data": { - "base": "output_path_placeholder/run=1970-01-01-10-30-00/", + "base": "output_path_placeholder", "typesInfo": { "transformation": "WIDEROW", "fileFormat": "PARQUET", @@ -93,8 +93,8 @@ ] }, "timestamps": { - "jobStarted": "1970-01-01T10:30:00Z", - "jobCompleted": "1970-01-01T10:30:00Z", + "jobStarted": "job_started_placeholder", + "jobCompleted": "job_completed_placeholder", "min": "2021-09-17T09:05:28.590000001Z", "max": "2021-10-15T09:06:27.101185600Z" }, diff --git a/modules/common-transformer-stream/src/test/resources/processing-spec/5/output/good/parquet/completion.json b/modules/common-transformer-stream/src/test/resources/processing-spec/5/output/good/parquet/completion.json index e5054d6bc..59ac05ada 100644 --- a/modules/common-transformer-stream/src/test/resources/processing-spec/5/output/good/parquet/completion.json +++ b/modules/common-transformer-stream/src/test/resources/processing-spec/5/output/good/parquet/completion.json @@ -1,7 +1,7 @@ { "schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1", "data": { - "base": "output_path_placeholder/run=1970-01-01-10-30-00/", + "base": "output_path_placeholder", "typesInfo": { "transformation": "WIDEROW", "fileFormat": "PARQUET", @@ -105,8 +105,8 @@ ] }, "timestamps": { - "jobStarted": "1970-01-01T10:30:00Z", - "jobCompleted": "1970-01-01T10:30:00Z", + "jobStarted": "job_started_placeholder", + "jobCompleted": "job_completed_placeholder", "min": "2022-02-01T22:14:21.648Z", "max": "2022-02-02T01:01:01.648Z" }, diff --git a/modules/common-transformer-stream/src/test/resources/processing-spec/6/output/good/parquet/completion.json b/modules/common-transformer-stream/src/test/resources/processing-spec/6/output/good/parquet/completion.json index 173a79360..83397b2fe 100644 --- a/modules/common-transformer-stream/src/test/resources/processing-spec/6/output/good/parquet/completion.json +++ b/modules/common-transformer-stream/src/test/resources/processing-spec/6/output/good/parquet/completion.json @@ -1,7 +1,7 @@ { "schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1", "data": { - "base": "output_path_placeholder/run=1970-01-01-10-30-00/", + "base": "output_path_placeholder", "typesInfo": { "transformation": "WIDEROW", "fileFormat": "PARQUET", @@ -101,8 +101,8 @@ ] }, "timestamps": { - "jobStarted": "1970-01-01T10:30:00Z", - "jobCompleted": "1970-01-01T10:30:00Z", + "jobStarted": "job_started_placeholder", + "jobCompleted": "job_completed_placeholder", "min": "2022-02-01T22:14:21.648Z", "max": "2022-02-02T01:01:01.648Z" }, diff --git a/modules/common-transformer-stream/src/test/resources/processing-spec/7/output/good/parquet/completion.json b/modules/common-transformer-stream/src/test/resources/processing-spec/7/output/good/parquet/completion.json index 671dbfc33..477f0c7d5 100644 --- a/modules/common-transformer-stream/src/test/resources/processing-spec/7/output/good/parquet/completion.json +++ b/modules/common-transformer-stream/src/test/resources/processing-spec/7/output/good/parquet/completion.json @@ -1,7 +1,7 @@ { "schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1", "data": { - "base": "output_path_placeholder/run=1970-01-01-10-30-00/", + "base": "output_path_placeholder", "typesInfo": { "transformation": "WIDEROW", "fileFormat": "PARQUET", @@ -21,8 +21,8 @@ ] }, "timestamps": { - "jobStarted": "1970-01-01T10:30:00Z", - "jobCompleted": "1970-01-01T10:30:00Z", + "jobStarted": "job_started_placeholder", + "jobCompleted": "job_completed_placeholder", "min": "2022-02-01T22:32:41.069Z", "max": "2022-02-02T01:01:01.648Z" }, diff --git a/modules/common-transformer-stream/src/test/resources/processing-spec/8/output/completion.json b/modules/common-transformer-stream/src/test/resources/processing-spec/8/output/completion.json index edef7c0b5..5fd20cfcd 100644 --- a/modules/common-transformer-stream/src/test/resources/processing-spec/8/output/completion.json +++ b/modules/common-transformer-stream/src/test/resources/processing-spec/8/output/completion.json @@ -1,7 +1,7 @@ { "schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1", "data": { - "base": "output_path_placeholder/run=1970-01-01-10-30-00/", + "base": "output_path_placeholder", "typesInfo": { "transformation": "WIDEROW", "fileFormat": "JSON", @@ -93,8 +93,8 @@ ] }, "timestamps": { - "jobStarted": "1970-01-01T10:30:00Z", - "jobCompleted": "1970-01-01T10:31:00Z", + "jobStarted": "job_started_placeholder", + "jobCompleted": "job_completed_placeholder", "min": "2021-10-13T20:21:47.595072674Z", "max": "2021-10-15T00:51:57.521746512Z" }, diff --git a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/BaseProcessingSpec.scala b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/BaseProcessingSpec.scala index c4768b665..eb6df0c7d 100644 --- a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/BaseProcessingSpec.scala +++ b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/BaseProcessingSpec.scala @@ -13,6 +13,9 @@ package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.proce import cats.effect.{IO, Resource} import cats.effect.kernel.Ref +import io.circe.optics.JsonPath._ +import io.circe.parser.{parse => parseCirce} + import com.snowplowanalytics.snowplow.rdbloader.generated.BuildInfo import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.AppId import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.FileUtils @@ -58,11 +61,15 @@ trait BaseProcessingSpec extends Specification { } .reduce(_ and _) - protected def readMessageFromResource(resource: String, outputRootDirectory: Path) = + protected def readMessageFromResource(resource: String, outputRootDirectory: Path): IO[String] = ??? + + protected def readMessageFromResource(resource: String, completionMessageVars: BaseProcessingSpec.CompletionMessageVars) = readLinesFromResource(resource) .map(_.mkString) .map( - _.replace("output_path_placeholder", outputRootDirectory.toNioPath.toUri.toString.replaceAll("/+$", "")) + _.replace("output_path_placeholder", completionMessageVars.base.toNioPath.toUri.toString) + .replace("job_started_placeholder", completionMessageVars.jobStarted) + .replace("job_completed_placeholder", completionMessageVars.jobCompleted) .replace("version_placeholder", BuildInfo.version) .replace(" ", "") ) @@ -86,6 +93,15 @@ trait BaseProcessingSpec extends Specification { new String(encoder.encode(config.app.replace("file:/", "s3:/").getBytes)) ) } + + def extractCompletionMessageVars(processingOutput: BaseProcessingSpec.ProcessingOutput): BaseProcessingSpec.CompletionMessageVars = { + val message = processingOutput.completionMessages.head + val json = parseCirce(message).toOption.get + val base = root.data.base.string.getOption(json).get.stripPrefix("file://") + val jobStarted = root.data.timestamps.jobStarted.string.getOption(json).get + val jobCompleted = root.data.timestamps.jobCompleted.string.getOption(json).get + BaseProcessingSpec.CompletionMessageVars(Path(base), jobStarted, jobCompleted) + } } object BaseProcessingSpec { @@ -96,5 +112,13 @@ object BaseProcessingSpec { badrowsFromQueue: Vector[String], checkpointed: Int ) + final case class CompletionMessageVars( + base: Path, + jobStarted: String, + jobCompleted: String + ) { + def goodPath: Path = Path(s"$base/output=good") + def badPath: Path = Path(s"$base/output=bad") + } } diff --git a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/QueueBadSinkSpec.scala b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/QueueBadSinkSpec.scala index 3d1ae2eb1..eedfed54a 100644 --- a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/QueueBadSinkSpec.scala +++ b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/QueueBadSinkSpec.scala @@ -11,7 +11,6 @@ package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing import cats.effect.unsafe.implicits.global -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.AppId import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.QueueBadSinkSpec._ import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.BaseProcessingSpec.TransformerConfig import fs2.io.file.Path @@ -39,12 +38,12 @@ class QueueBadSinkSpec extends BaseProcessingSpec { inputEventsPath = "/processing-spec/1/input/events" ) - val config = TransformerConfig(configFromPath(outputDirectory), igluConfig) - val badDirectory = outputDirectory.resolve(s"run=1970-01-01-10-30-00-${AppId.appId}/output=bad") + val config = TransformerConfig(configFromPath(outputDirectory), igluConfig) for { output <- process(inputStream, config) - badDirectoryExists <- pathExists(badDirectory) + compVars = extractCompletionMessageVars(output) + badDirectoryExists <- pathExists(compVars.badPath) expectedBadRows <- readLinesFromResource("/processing-spec/1/output/bad") } yield { val actualBadRows = output.badrowsFromQueue.toList @@ -98,7 +97,8 @@ object QueueBadSinkSpec { | "schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-0", | "data": { | "cacheSize": 500, - | "repositories": [] + | "repositories": [ + | ] | } |}""".stripMargin } diff --git a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/ShredTsvProcessingSpec.scala b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/ShredTsvProcessingSpec.scala index e7d1efac6..671969620 100644 --- a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/ShredTsvProcessingSpec.scala +++ b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/ShredTsvProcessingSpec.scala @@ -10,9 +10,8 @@ */ package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.AppId import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.BaseProcessingSpec.TransformerConfig -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.ShredTsvProcessingSpec.{appConfig, igluConfig} +import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.ShredTsvProcessingSpec._ import cats.effect.unsafe.implicits.global import fs2.io.file.Path @@ -31,43 +30,32 @@ class ShredTsvProcessingSpec extends BaseProcessingSpec { for { output <- process(inputStream, config) + compVars = extractCompletionMessageVars(output) actualAtomicRows <- readStringRowsFrom( - Path( - outputDirectory.toString + - s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/revision=0/addition=0" - ) + compVars.goodPath / "vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/revision=0/addition=0" ) actualOptimizelyRows <- readStringRowsFrom( - Path( - outputDirectory.toString + - s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.optimizely/name=state/format=tsv/model=1/revision=0/addition=0" - ) + compVars.goodPath / "vendor=com.optimizely/name=state/format=tsv/model=1/revision=0/addition=0" ) actualConsentRows <- readStringRowsFrom( - Path( - outputDirectory.toString + - s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=consent_document/format=tsv/model=1/revision=0/addition=0" - ) + compVars.goodPath / "vendor=com.snowplowanalytics.snowplow/name=consent_document/format=tsv/model=1/revision=0/addition=0" ) actualBadRows <- readStringRowsFrom( - Path( - outputDirectory.toString + - s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad/vendor=com.snowplowanalytics.snowplow.badrows/name=loader_parsing_error/format=json/model=2/revision=0/addition=0" - ) + compVars.badPath / "vendor=com.snowplowanalytics.snowplow.badrows/name=loader_parsing_error/format=json/model=2/revision=0/addition=0" ) - expectedCompletionMessage <- readMessageFromResource("/processing-spec/1/output/good/tsv/completion.json", outputDirectory) + expectedCompletionMessage <- readMessageFromResource("/processing-spec/1/output/good/tsv/completion.json", compVars) expectedAtomicRows <- readLinesFromResource("/processing-spec/1/output/good/tsv/com.snowplowanalytics.snowplow-atomic") expectedOptimizelyRows <- readLinesFromResource("/processing-spec/1/output/good/tsv/com.optimizely-state") expectedConsentRows <- readLinesFromResource("/processing-spec/1/output/good/tsv/com.snowplowanalytics.snowplow-consent_document") expectedBadRows <- readLinesFromResource("/processing-spec/1/output/bad") } yield { - removeAppId(output.completionMessages.toList) must beEqualTo(Vector(expectedCompletionMessage)) + output.completionMessages.toList must beEqualTo(Vector(expectedCompletionMessage)) output.checkpointed must beEqualTo(1) assertStringRows(removeAppId(actualAtomicRows), expectedAtomicRows) @@ -91,28 +79,23 @@ class ShredTsvProcessingSpec extends BaseProcessingSpec { for { output <- process(inputStream, config) + compVars = extractCompletionMessageVars(output) actualAtomicRows <- readStringRowsFrom( - Path( - outputDirectory.toString + - s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/revision=0/addition=0" - ) + compVars.goodPath / "vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/revision=0/addition=0" ) actualBadRows <- readStringRowsFrom( - Path( - outputDirectory.toString + - s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad/vendor=com.snowplowanalytics.snowplow.badrows/name=loader_iglu_error/format=json/model=2/revision=0/addition=0" - ) + compVars.badPath / "vendor=com.snowplowanalytics.snowplow.badrows/name=loader_iglu_error/format=json/model=2/revision=0/addition=0" ) - expectedCompletionMessage <- readMessageFromResource("/processing-spec/3/output/completion.json", outputDirectory) + expectedCompletionMessage <- readMessageFromResource("/processing-spec/3/output/completion.json", compVars) expectedBadRows <- readLinesFromResource("/processing-spec/3/output/bad") } yield { - removeAppId(output.completionMessages.toList) must beEqualTo(List(expectedCompletionMessage)) + output.completionMessages.toList must beEqualTo(List(expectedCompletionMessage)) output.checkpointed must beEqualTo(1) actualAtomicRows.size must beEqualTo(1) - assertStringRows(removeAppId(actualBadRows), expectedBadRows) + assertStringRows(removeLastAttempt(removeAppId(actualBadRows)), expectedBadRows) } } .unsafeRunSync() @@ -156,4 +139,7 @@ object ShredTsvProcessingSpec { | "repositories": [] | } |}""".stripMargin + + def removeLastAttempt(badRows: List[String]): List[String] = + badRows.map(_.replaceAll(""""lastAttempt":".{20}"""", """"lastAttempt":""""")) } diff --git a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/ShutdownSpec.scala b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/ShutdownSpec.scala index c167c0821..ea65d29bc 100644 --- a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/ShutdownSpec.scala +++ b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/ShutdownSpec.scala @@ -37,9 +37,10 @@ class ShutdownSpec extends BaseProcessingSpec { for { output <- runWithShutdown(inputStream, config) - expectedCompletionMessage <- readMessageFromResource("/processing-spec/8/output/completion.json", outputDirectory) + compVars = extractCompletionMessageVars(output) + expectedCompletionMessage <- readMessageFromResource("/processing-spec/8/output/completion.json", compVars) } yield { - removeAppId(output.completionMessages.toList) must beEqualTo(List(expectedCompletionMessage)) + output.completionMessages.toList must beEqualTo(List(expectedCompletionMessage)) output.checkpointed must beEqualTo(1) } diff --git a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/WiderowJsonProcessingSpec.scala b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/WiderowJsonProcessingSpec.scala index 8c3f47b10..6cc16ec3f 100644 --- a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/WiderowJsonProcessingSpec.scala +++ b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/WiderowJsonProcessingSpec.scala @@ -10,7 +10,6 @@ */ package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.AppId import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.BaseProcessingSpec.TransformerConfig import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.WiderowJsonProcessingSpec.{appConfig, igluConfig} @@ -28,20 +27,19 @@ class WiderowJsonProcessingSpec extends BaseProcessingSpec { inputEventsPath = "/processing-spec/1/input/events" ) - val config = TransformerConfig(appConfig(outputDirectory), igluConfig) - val goodPath = Path(outputDirectory.toString + s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good") - val badPath = Path(outputDirectory.toString + s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad") + val config = TransformerConfig(appConfig(outputDirectory), igluConfig) for { output <- process(inputStream, config) - actualGoodRows <- readStringRowsFrom(goodPath) - actualBadRows <- readStringRowsFrom(badPath) + compVars = extractCompletionMessageVars(output) + actualGoodRows <- readStringRowsFrom(compVars.goodPath) + actualBadRows <- readStringRowsFrom(compVars.badPath) - expectedCompletionMessage <- readMessageFromResource("/processing-spec/1/output/good/widerow/completion.json", outputDirectory) + expectedCompletionMessage <- readMessageFromResource("/processing-spec/1/output/good/widerow/completion.json", compVars) expectedGoodRows <- readLinesFromResource("/processing-spec/1/output/good/widerow/events") expectedBadRows <- readLinesFromResource("/processing-spec/1/output/bad") } yield { - removeAppId(output.completionMessages.toList) must beEqualTo(List(expectedCompletionMessage)) + output.completionMessages.toList must beEqualTo(List(expectedCompletionMessage)) output.checkpointed must beEqualTo(1) assertStringRows(actualGoodRows, expectedGoodRows) assertStringRows(actualBadRows, expectedBadRows) diff --git a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/WiderowParquetProcessingSpec.scala b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/WiderowParquetProcessingSpec.scala index 6e5acc28b..5a6a3f95d 100644 --- a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/WiderowParquetProcessingSpec.scala +++ b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/WiderowParquetProcessingSpec.scala @@ -14,7 +14,7 @@ import cats.effect.IO import cats.effect.unsafe.implicits.global import com.snowplowanalytics.snowplow.analytics.scalasdk.Event import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent.Contexts -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.{AppId, ParquetUtils} +import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.ParquetUtils import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.ParquetUtils.{readParquetColumns, readParquetRowsAsJsonFrom} import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.BaseProcessingSpec.TransformerConfig import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.WiderowParquetProcessingSpec.{appConfig, igluConfig} @@ -42,28 +42,26 @@ class WiderowParquetProcessingSpec extends BaseProcessingSpec { inputEventsPath = "/processing-spec/4/input/events" // the same events as in resource file used in WideRowParquetSpec for batch transformer ) - - val config = TransformerConfig(appConfig(outputDirectory), igluConfig) - val goodPath = Path(outputDirectory.toString + s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good") - val badPath = Path(outputDirectory.toString + s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad") + val config = TransformerConfig(appConfig(outputDirectory), igluConfig) for { output <- process(inputStream, config) + compVars = extractCompletionMessageVars(output) expectedParquetColumns <- readParquetColumnsFromResource( "/processing-spec/4/output/good/parquet/schema" ) // the same schema as in resource file used in WideRowParquetSpec for batch transformer - actualParquetRows <- readParquetRowsAsJsonFrom(goodPath, expectedParquetColumns) - actualParquetColumns = readParquetColumns(goodPath) - actualBadRows <- readStringRowsFrom(badPath) + actualParquetRows <- readParquetRowsAsJsonFrom(compVars.goodPath, expectedParquetColumns) + actualParquetColumns = readParquetColumns(compVars.goodPath) + actualBadRows <- readStringRowsFrom(compVars.badPath) - expectedCompletionMessage <- readMessageFromResource("/processing-spec/4/output/good/parquet/completion.json", outputDirectory) + expectedCompletionMessage <- readMessageFromResource("/processing-spec/4/output/good/parquet/completion.json", compVars) expectedBadRows <- readLinesFromResource("/processing-spec/4/output/badrows") expectedParquetRows <- readGoodParquetEventsFromResource("/processing-spec/4/input/events", columnToAdjust = None) } yield { actualParquetRows.size must beEqualTo(46) actualBadRows.size must beEqualTo(4) - removeAppId(output.completionMessages.toList) must beEqualTo(List(expectedCompletionMessage)) + output.completionMessages.toList must beEqualTo(List(expectedCompletionMessage)) output.checkpointed must beEqualTo(1) assertParquetRows(actualParquetRows, expectedParquetRows) @@ -82,15 +80,15 @@ class WiderowParquetProcessingSpec extends BaseProcessingSpec { "/processing-spec/5/input/input-events-custom-contexts" // the same events as in resource file used in WideRowParquetSpec for batch transformer ) - val config = TransformerConfig(appConfig(outputDirectory), igluConfig) - val goodPath = Path(outputDirectory.toString + s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good") + val config = TransformerConfig(appConfig(outputDirectory), igluConfig) for { output <- process(inputStream, config) + compVars = extractCompletionMessageVars(output) expectedParquetColumns <- readParquetColumnsFromResource("/processing-spec/5/output/good/parquet/schema") - actualParquetRows <- readParquetRowsAsJsonFrom(goodPath, expectedParquetColumns) - actualParquetColumns = readParquetColumns(goodPath) - expectedCompletionMessage <- readMessageFromResource("/processing-spec/5/output/good/parquet/completion.json", outputDirectory) + actualParquetRows <- readParquetRowsAsJsonFrom(compVars.goodPath, expectedParquetColumns) + actualParquetColumns = readParquetColumns(compVars.goodPath) + expectedCompletionMessage <- readMessageFromResource("/processing-spec/5/output/good/parquet/completion.json", compVars) expectedParquetRows <- readGoodParquetEventsFromResource( "/processing-spec/5/input/input-events-custom-contexts", columnToAdjust = Some("contexts_com_snowplowanalytics_snowplow_parquet_test_a_1") @@ -98,7 +96,7 @@ class WiderowParquetProcessingSpec extends BaseProcessingSpec { } yield { actualParquetRows.size must beEqualTo(100) - removeAppId(output.completionMessages.toList) must beEqualTo(List(expectedCompletionMessage)) + output.completionMessages.toList must beEqualTo(List(expectedCompletionMessage)) output.checkpointed must beEqualTo(1) assertParquetRows(actualParquetRows, expectedParquetRows) @@ -115,15 +113,15 @@ class WiderowParquetProcessingSpec extends BaseProcessingSpec { "/processing-spec/6/input/input-events-custom-unstruct" // the same events as in resource file used in WideRowParquetSpec for batch transformer ) - val config = TransformerConfig(appConfig(outputDirectory), igluConfig) - val goodPath = Path(outputDirectory.toString + s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good") + val config = TransformerConfig(appConfig(outputDirectory), igluConfig) for { output <- process(inputStream, config) + compVars = extractCompletionMessageVars(output) expectedParquetColumns <- readParquetColumnsFromResource("/processing-spec/6/output/good/parquet/schema") - actualParquetRows <- readParquetRowsAsJsonFrom(goodPath, expectedParquetColumns) - actualParquetColumns = readParquetColumns(goodPath) - expectedCompletionMessage <- readMessageFromResource("/processing-spec/6/output/good/parquet/completion.json", outputDirectory) + actualParquetRows <- readParquetRowsAsJsonFrom(compVars.goodPath, expectedParquetColumns) + actualParquetColumns = readParquetColumns(compVars.goodPath) + expectedCompletionMessage <- readMessageFromResource("/processing-spec/6/output/good/parquet/completion.json", compVars) expectedParquetRows <- readGoodParquetEventsFromResource( "/processing-spec/6/input/input-events-custom-unstruct", @@ -132,7 +130,7 @@ class WiderowParquetProcessingSpec extends BaseProcessingSpec { } yield { actualParquetRows.size must beEqualTo(100) - removeAppId(output.completionMessages.toList) must beEqualTo(List(expectedCompletionMessage)) + output.completionMessages.toList must beEqualTo(List(expectedCompletionMessage)) output.checkpointed must beEqualTo(1) assertParquetRows(actualParquetRows, expectedParquetRows) @@ -149,21 +147,21 @@ class WiderowParquetProcessingSpec extends BaseProcessingSpec { "/processing-spec/7/input/events" // the same events as in resource file used in WideRowParquetSpec for batch transformer ) - val config = TransformerConfig(appConfig(outputDirectory), igluConfig) - val goodPath = Path(outputDirectory.toString + s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good") + val config = TransformerConfig(appConfig(outputDirectory), igluConfig) for { output <- process(inputStream, config) + compVars = extractCompletionMessageVars(output) expectedParquetColumns <- readParquetColumnsFromResource( "/processing-spec/7/output/good/parquet/schema" ) // the same schema as in resource file used in WideRowParquetSpec for batch transformer - actualParquetRows <- readParquetRowsAsJsonFrom(goodPath, expectedParquetColumns) - actualParquetColumns = readParquetColumns(goodPath) - expectedCompletionMessage <- readMessageFromResource("/processing-spec/7/output/good/parquet/completion.json", outputDirectory) + actualParquetRows <- readParquetRowsAsJsonFrom(compVars.goodPath, expectedParquetColumns) + actualParquetColumns = readParquetColumns(compVars.goodPath) + expectedCompletionMessage <- readMessageFromResource("/processing-spec/7/output/good/parquet/completion.json", compVars) } yield { actualParquetRows.size must beEqualTo(3) - removeAppId(output.completionMessages.toList) must beEqualTo(List(expectedCompletionMessage)) + output.completionMessages.toList must beEqualTo(List(expectedCompletionMessage)) output.checkpointed must beEqualTo(1) forall( diff --git a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/ShredJobSpec.scala b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/ShredJobSpec.scala index 82d16152a..f417b8614 100644 --- a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/ShredJobSpec.scala +++ b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/ShredJobSpec.scala @@ -335,16 +335,6 @@ object ShredJobSpec { |"cacheSize": 500, |"repositories": [ |{ - |"name": "Local Iglu Server", - |"priority": 0, - |"vendorPrefixes": [ "com.snowplowanalytics" ], - |"connection": { - |"http": { - |"uri": "http://localhost:8080/api" - |} - |} - |}, - |{ |"name": "Iglu Central", |"priority": 0, |"vendorPrefixes": [ "com.snowplowanalytics" ], diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index a3125cf4c..fca329f8b 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -178,6 +178,7 @@ object BuildSettings { "iglu:com.segment/screen/jsonschema/1-0-0", "iglu:com.snowplowanalytics.snowplow/atomic/jsonschema/1-0-0", "iglu:com.snowplowanalytics.snowplow/change_form/jsonschema/1-0-0", + "iglu:com.snowplowanalytics.snowplow/client_session/jsonschema/1-0-0", "iglu:com.snowplowanalytics.snowplow/client_session/jsonschema/1-0-1", "iglu:com.snowplowanalytics.snowplow/consent_document/jsonschema/1-0-0", "iglu:com.snowplowanalytics.snowplow/consent_withdrawn/jsonschema/1-0-0",