Skip to content

Commit

Permalink
Merge pull request #305 from uptane/update-akka
Browse files Browse the repository at this point in the history
Update akka
  • Loading branch information
simao committed Oct 4, 2023
2 parents 866bc9a + 7ad307a commit 4edd595
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 39 deletions.
33 changes: 29 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,13 @@ on: [push, pull_request]
env:
SBT_OPTS: "-Dsbt.color=true -Dscala.color=true"
KAFKA_HOST: kafka:9092
KAFKA_ZOOKEEPER_CONNECT: kafka:2181
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
jobs:
run-tests:
name: Run tests
runs-on: ubuntu-latest
container: uptane/ci:jvm17-latest
services:
kafka:
image: spotify/kafka

db:
image: mariadb:10.4
env:
Expand All @@ -21,6 +18,34 @@ jobs:
MYSQL_PASSWORD: libats
MYSQL_DATABASE: libats

zookeeper:
image: bitnami/zookeeper:3.8.1
ports:
- 2181:2181
env:
ALLOW_ANONYMOUS_LOGIN: yes
options: >-
--health-cmd "echo mntr | nc -w 2 -q 2 localhost 2181"
--health-interval 10s
--health-timeout 5s
--health-retries 5
kafka:
image: bitnami/kafka:2.8.1
ports:
- 9092:9092
options: >-
--health-cmd "kafka-broker-api-versions.sh --version"
--health-interval 10s
--health-timeout 5s
--health-retries 5
env:
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
ALLOW_PLAINTEXT_LISTENER: yes
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_ENABLE_KRAFT: "no"

env:
DB_URL: "jdbc:mariadb://db:3306/libats"

Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@

val Library = new {
object Version {
val akka = "2.6.20"
val akkaHttp = "10.2.10"
val akka = "2.8.5"
val akkaHttp = "10.5.2"
val akkaHttpCirce = "1.39.2"
val circe = "0.14.6"
val refined = "0.11.0"
Expand Down
2 changes: 1 addition & 1 deletion libats-messaging/build.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name := "libats-messaging"

libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream-kafka" % "2.1.0"
"com.typesafe.akka" %% "akka-stream-kafka" % "4.0.2"
)
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import akka.testkit.TestKit
import com.advancedtelematic.libats.messaging.kafka.{JsonDeserializerException, KafkaClient}
import com.advancedtelematic.libats.messaging_datatype.MessageLike
import com.typesafe.config.ConfigFactory
import io.circe.syntax._
import io.circe.syntax.*
import io.circe.{Decoder, Encoder, Json}
import org.scalatest.concurrent.{Eventually, PatienceConfiguration, ScalaFutures}
import org.scalatest.time.{Millis, Seconds, Span}
Expand All @@ -23,15 +23,23 @@ import org.scalatest.funsuite.AnyFunSuiteLike
import org.scalatest.matchers.should.Matchers

import java.time.Instant
import scala.jdk.CollectionConverters._
import scala.annotation.tailrec
import scala.jdk.CollectionConverters.*
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.duration.*


case class KafkaSpecMessage(id: Int, payload: String)
case class KafkaSpecMessage0(payload: String)
case class KafkaSpecMessage1(payload: String)
case class KafkaSpecMessage2(payload: String)
case class KafkaSpecMessage3(payload: String)
case class KafkaSpecMessage4(payload: String)

object KafkaSpecMessage {
implicit val messageLike: com.advancedtelematic.libats.messaging_datatype.MessageLike[com.advancedtelematic.libats.messaging.kakfa.KafkaSpecMessage] = MessageLike.derive[KafkaSpecMessage](_.id.toString)
implicit val messageLike0: MessageLike[KafkaSpecMessage0] = MessageLike.derive(_ => "KafkaSpecMessage0")
implicit val messageLike1: MessageLike[KafkaSpecMessage1] = MessageLike.derive(_ => "KafkaSpecMessage1")
implicit val messageLike2: MessageLike[KafkaSpecMessage2] = MessageLike.derive(_ => "KafkaSpecMessage2")
implicit val messageLike3: MessageLike[KafkaSpecMessage3] = MessageLike.derive(_ => "KafkaSpecMessage3")
implicit val messageLike4: MessageLike[KafkaSpecMessage4] = MessageLike.derive(_ => "KafkaSpecMessage4")
}

class KafkaClientIntegrationSpec extends TestKit(ActorSystem("KafkaClientSpec"))
Expand All @@ -42,25 +50,27 @@ class KafkaClientIntegrationSpec extends TestKit(ActorSystem("KafkaClientSpec"))
with PatienceConfiguration
with Eventually {

import KafkaSpecMessage.*

implicit val _ec: scala.concurrent.ExecutionContextExecutor = system.dispatcher

override implicit def patienceConfig = PatienceConfig(timeout = Span(30, Seconds), interval = Span(500, Millis))
override implicit def patienceConfig = PatienceConfig(timeout = Span(10, Seconds), interval = Span(500, Millis))

val publisher = KafkaClient.publisher(system, system.settings.config)

lazy val commiterSettings = CommitterSettings(ConfigFactory.load().getConfig("ats.messaging.kafka.committer"))

test("can send an event to bus") {
val testMsg = KafkaSpecMessage(1, Instant.now.toString)
val testMsg = KafkaSpecMessage1(Instant.now.toString)
val f = publisher.publish(testMsg).map(_ => 0)
f.futureValue shouldBe 0
}

test("can send-receive events from bus") {
val testMsg = KafkaSpecMessage(2, Instant.now.toString)
val testMsg = KafkaSpecMessage2(Instant.now.toString)

val flow = Flow[KafkaSpecMessage].mapAsync(1)((_: KafkaSpecMessage) => FastFuture.successful(Done))
val source = KafkaClient.committableSource[KafkaSpecMessage](system.settings.config, commiterSettings, "kafka-test", flow)
val flow = Flow[KafkaSpecMessage2].mapAsync(1)((_: KafkaSpecMessage2) => FastFuture.successful(Done))
val source = KafkaClient.committableSource[KafkaSpecMessage2](system.settings.config, commiterSettings, "kafka-test", flow)
val msgFuture = source.groupedWithin(10, 5.seconds).runWith(Sink.head)

for {
Expand All @@ -72,15 +82,15 @@ class KafkaClientIntegrationSpec extends TestKit(ActorSystem("KafkaClientSpec"))
}

test("can send-receive and commit events from bus") {
val testMsg = KafkaSpecMessage(3, Instant.now.toString)
val testMsg = KafkaSpecMessage3(Instant.now.toString)

val cfg = ConfigFactory.parseString(
"""
|messaging.listener.parallelism=2
""".stripMargin).withFallback(system.settings.config)

val flow = Flow[KafkaSpecMessage].mapAsync(1)((_: KafkaSpecMessage) => FastFuture.successful(Done))
val source = KafkaClient.committableSource[KafkaSpecMessage](cfg, commiterSettings, "kafka-test", flow)
val flow = Flow[KafkaSpecMessage3].mapAsync(1)((_: KafkaSpecMessage3) => FastFuture.successful(Done))
val source = KafkaClient.committableSource[KafkaSpecMessage3](cfg, commiterSettings, "kafka-test", flow)

val msgFuture = source.runWith(Sink.head)

Expand All @@ -96,7 +106,7 @@ class KafkaClientIntegrationSpec extends TestKit(ActorSystem("KafkaClientSpec"))
val testMsgJson = Json.obj("not-valid" -> 0.asJson)

val jsonMsgLike = new MessageLike[Json]() {
override def streamName: String = implicitly[MessageLike[KafkaSpecMessage]].streamName // Push a bad json to the KafkaSpecMessage stream
override def streamName: String = implicitly[MessageLike[KafkaSpecMessage0]].streamName // Push a bad json to the KafkaSpecMessage stream

override def id(v: Json): String = "0L"

Expand All @@ -106,22 +116,23 @@ class KafkaClientIntegrationSpec extends TestKit(ActorSystem("KafkaClientSpec"))

val cfg = ConfigFactory.parseMap(Map("ats.messaging.kafka.skipJsonErrors" -> false).asJava).withFallback(system.settings.config)

val flow = Flow[KafkaSpecMessage].mapAsync(1)((_: KafkaSpecMessage) => FastFuture.successful(Done))
val source = KafkaClient.committableSource[KafkaSpecMessage](cfg, commiterSettings, "kafka-test", flow)
val flow = Flow[KafkaSpecMessage0].mapAsync(1)((_: KafkaSpecMessage0) => FastFuture.successful(Done))
val source = KafkaClient.committableSource[KafkaSpecMessage0](cfg, commiterSettings, "kafka-test", flow)

publisher.publish(testMsgJson)(implicitly, jsonMsgLike).futureValue
for {
_ <- akka.pattern.after(3.seconds)(Future.successful(()))
_ <- publisher.publish(testMsgJson)(implicitly, jsonMsgLike)
} yield ()

eventually {
val msgFuture = source.runWith(Sink.head)
msgFuture.failed.futureValue shouldBe a[JsonDeserializerException]
}
val msgFuture = source.runWith(Sink.head)
msgFuture.failed.futureValue shouldBe a[JsonDeserializerException]
}

test("skips error when json cannot be deserialized") {
val testMsgJson = Json.obj("not-valid" -> 0.asJson)

val jsonMsgLike = new MessageLike[Json]() {
override def streamName: String = implicitly[MessageLike[KafkaSpecMessage]].streamName // Push a bad json to the KafkaSpecMessage stream
override def streamName: String = implicitly[MessageLike[KafkaSpecMessage4]].streamName // Push a bad json to the KafkaSpecMessage stream

override def id(v: Json): String = "0L"

Expand All @@ -131,17 +142,18 @@ class KafkaClientIntegrationSpec extends TestKit(ActorSystem("KafkaClientSpec"))

val cfg = system.settings.config

val flow = Flow[KafkaSpecMessage].mapAsync(1)((_: KafkaSpecMessage) => FastFuture.successful(Done))
val source = KafkaClient.committableSource[KafkaSpecMessage](cfg, commiterSettings, "kafka-test", flow)
val flow = Flow[KafkaSpecMessage4].mapAsync(1)((_: KafkaSpecMessage4) => FastFuture.successful(Done))
val source = KafkaClient.committableSource[KafkaSpecMessage4](cfg, commiterSettings, "kafka-test", flow)

val testMsg = KafkaSpecMessage(5, Instant.now.toString)
val testMsg = KafkaSpecMessage4(Instant.now.toString)

publisher.publish(testMsgJson)(implicitly, jsonMsgLike).futureValue
publisher.publish(testMsg).futureValue
for {
_ <- akka.pattern.after(3.seconds)(Future.successful(()))
_ <- publisher.publish(testMsgJson) (implicitly, jsonMsgLike)
_ <- publisher.publish(testMsg)
} yield ()

eventually {
val msgFuture = source.runWith(Sink.head)
msgFuture.futureValue should equal(testMsg)
}
val msgFuture = source.runWith(Sink.head)
msgFuture.futureValue should equal(testMsg)
}
}

0 comments on commit 4edd595

Please sign in to comment.