Skip to content

Commit

Permalink
Transformer Stream: don't use hard-coded window timestamps in the tests
Browse files Browse the repository at this point in the history
Some of the tests are failing since windows don't have expected timestamps.
In order to solve this problem, this commit makes necessary changes to read
window timestamps from shredded message instead of using hard-coded values.
  • Loading branch information
spenes committed Sep 28, 2024
1 parent 17f297a commit 6de4e2e
Show file tree
Hide file tree
Showing 17 changed files with 109 additions and 111 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1",
"data": {
"base": "output_path_placeholder/run=1970-01-01-10-30-00/",
"base": "output_path_placeholder",
"typesInfo": {
"transformation": "SHREDDED",
"types": [
Expand Down Expand Up @@ -113,8 +113,8 @@
]
},
"timestamps": {
"jobStarted": "1970-01-01T10:30:00Z",
"jobCompleted": "1970-01-01T10:30:00Z",
"jobStarted": "job_started_placeholder",
"jobCompleted": "job_completed_placeholder",
"min": "2021-10-13T20:21:47.595072674Z",
"max": "2021-10-15T00:51:57.521746512Z"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1",
"data": {
"base": "output_path_placeholder/run=1970-01-01-10-30-00/",
"base": "output_path_placeholder",
"typesInfo": {
"transformation": "WIDEROW",
"fileFormat": "JSON",
Expand Down Expand Up @@ -93,8 +93,8 @@
]
},
"timestamps": {
"jobStarted": "1970-01-01T10:30:00Z",
"jobCompleted": "1970-01-01T10:30:00Z",
"jobStarted": "job_started_placeholder",
"jobCompleted": "job_completed_placeholder",
"min": "2021-10-13T20:21:47.595072674Z",
"max": "2021-10-15T00:51:57.521746512Z"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"schema":"iglu:com.snowplowanalytics.snowplow.badrows/loader_iglu_error/jsonschema/2-0-0","data":{"processor":{"artifact":"snowplow-transformer-kinesis","version":"version_placeholder"},"failure":[{"schemaCriterion":"iglu:org.schema/some_unknown_name/jsonschema/1-*-*","error":{"error":"ResolutionError","lookupHistory":[{"repository":"Iglu Client Embedded","errors":[{"error":"NotFound"}],"attempts":1,"lastAttempt":"1970-01-01T10:30:00Z"}]}}],"payload":{"app_id":"snowplowweb","platform":"web","etl_tstamp":"2014-06-01T14:04:11.639Z","collector_tstamp":"2014-05-29T18:16:35Z","dvce_created_tstamp":"2014-05-29T18:04:11.639Z","event":"page_view","event_id":"2b1b25a4-c0df-4859-8201-cf21492ad61b","txn_id":836413,"name_tracker":"clojure","v_tracker":"js-2.0.0-M2","v_collector":"clj-0.6.0-tom-0.0.4","v_etl":"hadoop-0.5.0-common-0.4.0","user_id":null,"user_ipaddress":"216.207.42.134","user_fingerprint":"3499345421","domain_userid":"3b1d1a375044eede","domain_sessionidx":3,"network_userid":"2bad2a4e-aae4-4bea-8acd-399e7fe0366a","geo_country":"US","geo_region":"CA","geo_city":"South San Francisco","geo_zipcode":null,"geo_latitude":37.654694,"geo_longitude":-122.4077,"geo_region_name":null,"ip_isp":null,"ip_organization":null,"ip_domain":null,"ip_netspeed":null,"page_url":"http://snowplowanalytics.com/blog/2013/02/08/writing-hive-udfs-and-serdes/","page_title":"Writing Hive UDFs - a tutorial","page_referrer":null,"page_urlscheme":"http","page_urlhost":"snowplowanalytics.com","page_urlport":80,"page_urlpath":"/blog/2013/02/08/writing-hive-udfs-and-serdes/","page_urlquery":null,"page_urlfragment":null,"refr_urlscheme":null,"refr_urlhost":null,"refr_urlport":null,"refr_urlpath":null,"refr_urlquery":null,"refr_urlfragment":null,"refr_medium":null,"refr_source":null,"refr_term":null,"mkt_medium":null,"mkt_source":null,"mkt_term":null,"mkt_content":null,"mkt_campaign":null,"contexts":{},"se_category":null,"se_action":null,"se_label":null,"se_property":null,"se_value":null,"unstruct_event":null,"tr_orderid":null,"tr_affiliation":null,"tr_total":null,"tr_tax":null,"tr_shipping":null,"tr_city":null,"tr_state":null,"tr_country":null,"ti_orderid":null,"ti_sku":null,"ti_name":null,"ti_category":null,"ti_price":null,"ti_quantity":null,"pp_xoffset_min":null,"pp_xoffset_max":null,"pp_yoffset_min":null,"pp_yoffset_max":null,"useragent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/537.75.14","br_name":"Safari","br_family":"Safari","br_version":null,"br_type":"Browser","br_renderengine":"WEBKIT","br_lang":"en-us","br_features_pdf":false,"br_features_flash":false,"br_features_java":false,"br_features_director":false,"br_features_quicktime":false,"br_features_realplayer":false,"br_features_windowsmedia":false,"br_features_gears":false,"br_features_silverlight":false,"br_cookies":true,"br_colordepth":"24","br_viewwidth":1440,"br_viewheight":1845,"os_name":"Mac OS","os_family":"Mac OS","os_manufacturer":"Apple Inc.","os_timezone":"America/Los_Angeles","dvce_type":"Computer","dvce_ismobile":false,"dvce_screenwidth":1440,"dvce_screenheight":900,"doc_charset":"UTF-8","doc_width":1440,"doc_height":6015,"tr_currency":null,"tr_total_base":null,"tr_tax_base":null,"tr_shipping_base":null,"ti_currency":null,"ti_price_base":null,"base_currency":null,"geo_timezone":null,"mkt_clickid":null,"mkt_network":null,"etl_tags":null,"dvce_sent_tstamp":null,"refr_domain_userid":null,"refr_dvce_tstamp":null,"derived_contexts":{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:org.schema/some_unknown_name/jsonschema/1-0-0","data":{"datePublished":"2014-07-23T00:00:00Z","author":"Jonathan Almeida","inLanguage":"en-US","genre":"blog","breadcrumb":["blog","releases"],"keywords":["snowplow","analytics","java","jvm","tracker"]}}]},"domain_sessionid":null,"derived_tstamp":null,"event_vendor":null,"event_name":null,"event_format":null,"event_version":null,"event_fingerprint":null,"true_tstamp":null}}}
{"schema":"iglu:com.snowplowanalytics.snowplow.badrows/loader_iglu_error/jsonschema/2-0-0","data":{"processor":{"artifact":"snowplow-transformer-kinesis","version":"version_placeholder"},"failure":[{"schemaKey":"iglu:org.schema/some_unknown_name/jsonschema/1-0-0","error":{"error":"ResolutionError","lookupHistory":[{"repository":"Iglu Client Embedded","errors":[{"error":"NotFound"}],"attempts":1,"lastAttempt":""}]}}],"payload":{"app_id":"snowplowweb","platform":"web","etl_tstamp":"2014-06-01T14:04:11.639Z","collector_tstamp":"2014-05-29T18:16:35Z","dvce_created_tstamp":"2014-05-29T18:04:11.639Z","event":"page_view","event_id":"2b1b25a4-c0df-4859-8201-cf21492ad61b","txn_id":836413,"name_tracker":"clojure","v_tracker":"js-2.0.0-M2","v_collector":"clj-0.6.0-tom-0.0.4","v_etl":"hadoop-0.5.0-common-0.4.0","user_id":null,"user_ipaddress":"216.207.42.134","user_fingerprint":"3499345421","domain_userid":"3b1d1a375044eede","domain_sessionidx":3,"network_userid":"2bad2a4e-aae4-4bea-8acd-399e7fe0366a","geo_country":"US","geo_region":"CA","geo_city":"South San Francisco","geo_zipcode":null,"geo_latitude":37.654694,"geo_longitude":-122.4077,"geo_region_name":null,"ip_isp":null,"ip_organization":null,"ip_domain":null,"ip_netspeed":null,"page_url":"http://snowplowanalytics.com/blog/2013/02/08/writing-hive-udfs-and-serdes/","page_title":"Writing Hive UDFs - a tutorial","page_referrer":null,"page_urlscheme":"http","page_urlhost":"snowplowanalytics.com","page_urlport":80,"page_urlpath":"/blog/2013/02/08/writing-hive-udfs-and-serdes/","page_urlquery":null,"page_urlfragment":null,"refr_urlscheme":null,"refr_urlhost":null,"refr_urlport":null,"refr_urlpath":null,"refr_urlquery":null,"refr_urlfragment":null,"refr_medium":null,"refr_source":null,"refr_term":null,"mkt_medium":null,"mkt_source":null,"mkt_term":null,"mkt_content":null,"mkt_campaign":null,"contexts":{},"se_category":null,"se_action":null,"se_label":null,"se_property":null,"se_value":null,"unstruct_event":null,"tr_orderid":null,"tr_affiliation":null,"tr_total":null,"tr_tax":null,"tr_shipping":null,"tr_city":null,"tr_state":null,"tr_country":null,"ti_orderid":null,"ti_sku":null,"ti_name":null,"ti_category":null,"ti_price":null,"ti_quantity":null,"pp_xoffset_min":null,"pp_xoffset_max":null,"pp_yoffset_min":null,"pp_yoffset_max":null,"useragent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/537.75.14","br_name":"Safari","br_family":"Safari","br_version":null,"br_type":"Browser","br_renderengine":"WEBKIT","br_lang":"en-us","br_features_pdf":false,"br_features_flash":false,"br_features_java":false,"br_features_director":false,"br_features_quicktime":false,"br_features_realplayer":false,"br_features_windowsmedia":false,"br_features_gears":false,"br_features_silverlight":false,"br_cookies":true,"br_colordepth":"24","br_viewwidth":1440,"br_viewheight":1845,"os_name":"Mac OS","os_family":"Mac OS","os_manufacturer":"Apple Inc.","os_timezone":"America/Los_Angeles","dvce_type":"Computer","dvce_ismobile":false,"dvce_screenwidth":1440,"dvce_screenheight":900,"doc_charset":"UTF-8","doc_width":1440,"doc_height":6015,"tr_currency":null,"tr_total_base":null,"tr_tax_base":null,"tr_shipping_base":null,"ti_currency":null,"ti_price_base":null,"base_currency":null,"geo_timezone":null,"mkt_clickid":null,"mkt_network":null,"etl_tags":null,"dvce_sent_tstamp":null,"refr_domain_userid":null,"refr_dvce_tstamp":null,"derived_contexts":{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:org.schema/some_unknown_name/jsonschema/1-0-0","data":{"datePublished":"2014-07-23T00:00:00Z","author":"Jonathan Almeida","inLanguage":"en-US","genre":"blog","breadcrumb":["blog","releases"],"keywords":["snowplow","analytics","java","jvm","tracker"]}}]},"domain_sessionid":null,"derived_tstamp":null,"event_vendor":null,"event_name":null,"event_format":null,"event_version":null,"event_fingerprint":null,"true_tstamp":null}}}
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
{
"schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1",
"data": {
"base": "output_path_placeholder/run=1970-01-01-10-30-00/",
"base": "output_path_placeholder",
"typesInfo": {
"transformation": "SHREDDED",
"types": []
},
"timestamps": {
"jobStarted": "1970-01-01T10:30:00Z",
"jobCompleted": "1970-01-01T10:30:00Z",
"jobStarted": "job_started_placeholder",
"jobCompleted": "job_completed_placeholder",
"min": "2014-05-29T18:16:35Z",
"max": "2014-05-29T18:16:35Z"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1",
"data": {
"base": "output_path_placeholder/run=1970-01-01-10-30-00/",
"base": "output_path_placeholder",
"typesInfo": {
"transformation": "WIDEROW",
"fileFormat": "PARQUET",
Expand Down Expand Up @@ -93,8 +93,8 @@
]
},
"timestamps": {
"jobStarted": "1970-01-01T10:30:00Z",
"jobCompleted": "1970-01-01T10:30:00Z",
"jobStarted": "job_started_placeholder",
"jobCompleted": "job_completed_placeholder",
"min": "2021-09-17T09:05:28.590000001Z",
"max": "2021-10-15T09:06:27.101185600Z"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1",
"data": {
"base": "output_path_placeholder/run=1970-01-01-10-30-00/",
"base": "output_path_placeholder",
"typesInfo": {
"transformation": "WIDEROW",
"fileFormat": "PARQUET",
Expand Down Expand Up @@ -105,8 +105,8 @@
]
},
"timestamps": {
"jobStarted": "1970-01-01T10:30:00Z",
"jobCompleted": "1970-01-01T10:30:00Z",
"jobStarted": "job_started_placeholder",
"jobCompleted": "job_completed_placeholder",
"min": "2022-02-01T22:14:21.648Z",
"max": "2022-02-02T01:01:01.648Z"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1",
"data": {
"base": "output_path_placeholder/run=1970-01-01-10-30-00/",
"base": "output_path_placeholder",
"typesInfo": {
"transformation": "WIDEROW",
"fileFormat": "PARQUET",
Expand Down Expand Up @@ -101,8 +101,8 @@
]
},
"timestamps": {
"jobStarted": "1970-01-01T10:30:00Z",
"jobCompleted": "1970-01-01T10:30:00Z",
"jobStarted": "job_started_placeholder",
"jobCompleted": "job_completed_placeholder",
"min": "2022-02-01T22:14:21.648Z",
"max": "2022-02-02T01:01:01.648Z"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1",
"data": {
"base": "output_path_placeholder/run=1970-01-01-10-30-00/",
"base": "output_path_placeholder",
"typesInfo": {
"transformation": "WIDEROW",
"fileFormat": "PARQUET",
Expand All @@ -21,8 +21,8 @@
]
},
"timestamps": {
"jobStarted": "1970-01-01T10:30:00Z",
"jobCompleted": "1970-01-01T10:30:00Z",
"jobStarted": "job_started_placeholder",
"jobCompleted": "job_completed_placeholder",
"min": "2022-02-01T22:32:41.069Z",
"max": "2022-02-02T01:01:01.648Z"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1",
"data": {
"base": "output_path_placeholder/run=1970-01-01-10-30-00/",
"base": "output_path_placeholder",
"typesInfo": {
"transformation": "WIDEROW",
"fileFormat": "JSON",
Expand Down Expand Up @@ -93,8 +93,8 @@
]
},
"timestamps": {
"jobStarted": "1970-01-01T10:30:00Z",
"jobCompleted": "1970-01-01T10:31:00Z",
"jobStarted": "job_started_placeholder",
"jobCompleted": "job_completed_placeholder",
"min": "2021-10-13T20:21:47.595072674Z",
"max": "2021-10-15T00:51:57.521746512Z"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.proce
import cats.effect.{IO, Resource}
import cats.effect.kernel.Ref

import io.circe.optics.JsonPath._
import io.circe.parser.{parse => parseCirce}

import com.snowplowanalytics.snowplow.rdbloader.generated.BuildInfo
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.AppId
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.FileUtils
Expand Down Expand Up @@ -58,11 +61,15 @@ trait BaseProcessingSpec extends Specification {
}
.reduce(_ and _)

protected def readMessageFromResource(resource: String, outputRootDirectory: Path) =
protected def readMessageFromResource(resource: String, outputRootDirectory: Path): IO[String] = ???

protected def readMessageFromResource(resource: String, completionMessageVars: BaseProcessingSpec.CompletionMessageVars) =
readLinesFromResource(resource)
.map(_.mkString)
.map(
_.replace("output_path_placeholder", outputRootDirectory.toNioPath.toUri.toString.replaceAll("/+$", ""))
_.replace("output_path_placeholder", completionMessageVars.base.toNioPath.toUri.toString)
.replace("job_started_placeholder", completionMessageVars.jobStarted)
.replace("job_completed_placeholder", completionMessageVars.jobCompleted)
.replace("version_placeholder", BuildInfo.version)
.replace(" ", "")
)
Expand All @@ -86,6 +93,15 @@ trait BaseProcessingSpec extends Specification {
new String(encoder.encode(config.app.replace("file:/", "s3:/").getBytes))
)
}

def extractCompletionMessageVars(processingOutput: BaseProcessingSpec.ProcessingOutput): BaseProcessingSpec.CompletionMessageVars = {
val message = processingOutput.completionMessages.head
val json = parseCirce(message).toOption.get
val base = root.data.base.string.getOption(json).get.stripPrefix("file://")
val jobStarted = root.data.timestamps.jobStarted.string.getOption(json).get
val jobCompleted = root.data.timestamps.jobCompleted.string.getOption(json).get
BaseProcessingSpec.CompletionMessageVars(Path(base), jobStarted, jobCompleted)
}
}

object BaseProcessingSpec {
Expand All @@ -96,5 +112,13 @@ object BaseProcessingSpec {
badrowsFromQueue: Vector[String],
checkpointed: Int
)
final case class CompletionMessageVars(
base: Path,
jobStarted: String,
jobCompleted: String
) {
def goodPath: Path = Path(s"$base/output=good")
def badPath: Path = Path(s"$base/output=bad")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing

import cats.effect.unsafe.implicits.global
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.AppId
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.QueueBadSinkSpec._
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.BaseProcessingSpec.TransformerConfig
import fs2.io.file.Path
Expand Down Expand Up @@ -39,12 +38,12 @@ class QueueBadSinkSpec extends BaseProcessingSpec {
inputEventsPath = "/processing-spec/1/input/events"
)

val config = TransformerConfig(configFromPath(outputDirectory), igluConfig)
val badDirectory = outputDirectory.resolve(s"run=1970-01-01-10-30-00-${AppId.appId}/output=bad")
val config = TransformerConfig(configFromPath(outputDirectory), igluConfig)

for {
output <- process(inputStream, config)
badDirectoryExists <- pathExists(badDirectory)
compVars = extractCompletionMessageVars(output)
badDirectoryExists <- pathExists(compVars.badPath)
expectedBadRows <- readLinesFromResource("/processing-spec/1/output/bad")
} yield {
val actualBadRows = output.badrowsFromQueue.toList
Expand Down Expand Up @@ -98,7 +97,8 @@ object QueueBadSinkSpec {
| "schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-0",
| "data": {
| "cacheSize": 500,
| "repositories": []
| "repositories": [
| ]
| }
|}""".stripMargin
}
Loading

0 comments on commit 6de4e2e

Please sign in to comment.