Skip to content

Commit

Permalink
Merge pull request #306 from uptane/update-akka
Browse files Browse the repository at this point in the history
Add api to set `group.instance.id`
  • Loading branch information
simao committed Oct 9, 2023
2 parents 4edd595 + b3ba402 commit a84f8be
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 40 deletions.
1 change: 1 addition & 0 deletions libats-messaging/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ ats.messaging {
host = ${?KAFKA_HOST}

committer = ${akka.kafka.committer}
consumer = ${akka.kafka.consumer}
}
listener {
parallelism = 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ object MessageBus {
case "kafka" =>
log.info("Starting messaging mode: Kafka")
log.info(s"Using stream name: ${messageLike.streamName}")
KafkaClient.source(system, config, groupId)(messageLike)
KafkaClient.source(config, groupId, groupInstanceId = None)(messageLike)
case "local" | "test" =>
log.info("Using local event bus")
LocalMessageBus.subscribe(system, config, op)
Expand All @@ -83,7 +83,7 @@ object MessageBus {
val processingFlow = Flow[T].mapAsync[Any](listenerParallelism)(op)
val committerSettings = CommitterSettings(config.getConfig("ats.messaging.kafka.committer"))

KafkaClient.committableSource[T](config, committerSettings, groupIdPrefix, processingFlow).mapMaterializedValue(_ => NotUsed)
KafkaClient.committableSource[T](config, committerSettings, groupIdPrefix, groupInstanceId = None, processingFlow).mapMaterializedValue(_ => NotUsed)

case "local" | "test" =>
log.info("Using local event bus")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@ import akka.event.Logging
import akka.kafka.ConsumerMessage.CommittableMessage
import akka.kafka.scaladsl.Consumer.Control
import akka.kafka.scaladsl.{Committer, Consumer}
import akka.kafka._
import akka.kafka.*
import akka.stream.scaladsl.{Flow, Sink, Source}
import com.advancedtelematic.libats.messaging.MessageBusPublisher
import com.advancedtelematic.libats.messaging.metrics.KafkaMetrics
import com.advancedtelematic.libats.messaging_datatype.MessageLike
import com.typesafe.config.Config
import io.circe.syntax._
import io.circe.syntax.*
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.serialization._
import org.apache.kafka.common.serialization.*

import java.time.temporal.ChronoUnit
import scala.concurrent.{ExecutionContext, Future, Promise}

object KafkaClient {
Expand All @@ -31,45 +32,55 @@ object KafkaClient {
val kafkaProducer = producer(config)(system)

new MessageBusPublisher {
override def publish[T](msg: T)(implicit ex: ExecutionContext, messageLike: MessageLike[T]): Future[Unit] = {
override def publish[T](msg: T)(
implicit ex: ExecutionContext,
messageLike: MessageLike[T]): Future[Unit] = {
val promise = Promise[RecordMetadata]()

val topic = topicNameFn(messageLike.streamName)

val record = new ProducerRecord[Array[Byte], String](topic,
messageLike.id(msg).getBytes, msg.asJson(messageLike.encoder).noSpaces)

kafkaProducer.send(record, new Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
if (exception != null)
promise.failure(exception)
else if (metadata != null)
promise.success(metadata)
else
promise.failure(new Exception("Unknown error occurred, no metadata or error received"))
val record = new ProducerRecord[Array[Byte], String](
topic,
messageLike.id(msg).getBytes,
msg.asJson(messageLike.encoder).noSpaces)

//noinspection ConvertExpressionToSAM
kafkaProducer.send(
record,
new Callback {
override def onCompletion(metadata: RecordMetadata,
exception: Exception): Unit = {
if (exception != null)
promise.failure(exception)
else if (metadata != null)
promise.success(metadata)
else
promise.failure(
new Exception(
"Unknown error occurred, no metadata or error received"))
}
}
})
)

promise.future.map(_ => ())
}
}
}

def source[T](config: Config, groupId: String, groupInstanceId: Option[String])(
implicit ml: MessageLike[T]): Source[T, NotUsed] =
plainSource(config, groupId, groupInstanceId)(ml).mapMaterializedValue(_ => NotUsed)

def source[T](system: ActorSystem, config: Config, groupId: String)
(implicit ml: MessageLike[T]): Source[T, NotUsed] =
plainSource(config, groupId)(ml, system).mapMaterializedValue(_ => NotUsed)

private def plainSource[T](config: Config, groupIdPrefix: String)
(implicit ml: MessageLike[T], system: ActorSystem): Source[T, Control] = {
val (consumerSettings, subscriptions) = buildSource(config, groupIdPrefix)
private def plainSource[T](config: Config, groupIdPrefix: String, groupInstanceId: Option[String])
(implicit ml: MessageLike[T]): Source[T, Control] = {
val (consumerSettings, subscriptions) = buildSource(config, groupIdPrefix, groupInstanceId)
val settings = consumerSettings.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
Consumer.plainSource(settings, subscriptions).map(_.value()).filter(_ != null)
}

def committableSource[T](config: Config, committerSettings: CommitterSettings, groupIdPrefix: String, processingFlow: Flow[T, Any, NotUsed])
def committableSource[T](config: Config, committerSettings: CommitterSettings, groupIdPrefix: String, groupInstanceId: Option[String], processingFlow: Flow[T, Any, NotUsed])
(implicit ml: MessageLike[T], system: ActorSystem): Source[T, Control] = {
val (cfgSettings, subscriptions) = buildSource(config, groupIdPrefix)
val (cfgSettings, subscriptions) = buildSource(config, groupIdPrefix, groupInstanceId)
val log = Logging.getLogger(system, this.getClass)

val committerSink = Committer.sink(committerSettings)
Expand All @@ -78,33 +89,42 @@ object KafkaClient {
.filter(_.record.value() != null)
.map { msg => log.debug(s"Parsed ${msg.record.value()}") ; msg }
.alsoTo {
Flow[CommittableMessage[_, T]]
Flow[CommittableMessage[?, T]]
.map(_.record.value())
.via(processingFlow)
.to(Sink.ignore)
}
.alsoTo {
Flow[CommittableMessage[_, _]]
Flow[CommittableMessage[?, T]]
.map(_.committableOffset)
.to(committerSink)
}
.map(_.record.value())
}

private def buildSource[T, M](config: Config, groupIdPrefix: String)
(implicit system: ActorSystem, ml: MessageLike[M]): (ConsumerSettings[Array[Byte], M], Subscription) = {
// groupInstanceId is used as kafka-client's group.instance.id and makes this consumer a dynamic consumer vs. static
// See kafka docs
private def buildSource[M](config: Config, groupIdPrefix: String, groupInstanceId: Option[String] = None)
(implicit ml: MessageLike[M]): (ConsumerSettings[Array[Byte], M], Subscription) = {
val topicFn = topic(config)
val consumerSettings = {
val host = config.getString("ats.messaging.kafka.host")
val groupId = groupIdPrefix + "-" + topicFn(ml.streamName)
val skipJsonErrors = config.getBoolean("ats.messaging.kafka.skipJsonErrors")
val skipJsonErrors =
config.getBoolean("ats.messaging.kafka.skipJsonErrors")

val consumerConfig = config.getConfig("ats.messaging.kafka.consumer")

ConsumerSettings(system, new ByteArrayDeserializer, new JsonDeserializer(ml.decoder, throwException = ! skipJsonErrors))
ConsumerSettings(consumerConfig,
new ByteArrayDeserializer,
new JsonDeserializer(ml.decoder,
throwException = !skipJsonErrors))
.withBootstrapServers(host)
.withGroupId(groupId)
.withClientId(s"consumer-$groupId")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
.withProperty("metric.reporters", classOf[KafkaMetrics].getName)
.withGroupInstanceId(groupInstanceId.orNull)
}

val subscription = Subscriptions.topics(topicFn(ml.streamName))
Expand All @@ -114,11 +134,12 @@ object KafkaClient {

private[this] def topic(config: Config): String => String = {
val suffix = config.getString("ats.messaging.kafka.topicSuffix")
(streamName: String) => streamName + "-" + suffix
(streamName: String) =>
streamName + "-" + suffix
}

private[this] def producer(config: Config)
(implicit system: ActorSystem): KafkaProducer[Array[Byte], String] =
private[this] def producer(config: Config)(
implicit system: ActorSystem): KafkaProducer[Array[Byte], String] =
ProducerSettings.createKafkaProducer(
ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(config.getString("ats.messaging.kafka.host"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class KafkaClientIntegrationSpec extends TestKit(ActorSystem("KafkaClientSpec"))
val testMsg = KafkaSpecMessage2(Instant.now.toString)

val flow = Flow[KafkaSpecMessage2].mapAsync(1)((_: KafkaSpecMessage2) => FastFuture.successful(Done))
val source = KafkaClient.committableSource[KafkaSpecMessage2](system.settings.config, commiterSettings, "kafka-test", flow)
val source = KafkaClient.committableSource[KafkaSpecMessage2](system.settings.config, commiterSettings, "kafka-test", groupInstanceId = None, flow)
val msgFuture = source.groupedWithin(10, 5.seconds).runWith(Sink.head)

for {
Expand All @@ -90,7 +90,7 @@ class KafkaClientIntegrationSpec extends TestKit(ActorSystem("KafkaClientSpec"))
""".stripMargin).withFallback(system.settings.config)

val flow = Flow[KafkaSpecMessage3].mapAsync(1)((_: KafkaSpecMessage3) => FastFuture.successful(Done))
val source = KafkaClient.committableSource[KafkaSpecMessage3](cfg, commiterSettings, "kafka-test", flow)
val source = KafkaClient.committableSource[KafkaSpecMessage3](cfg, commiterSettings, "kafka-test", groupInstanceId = None, flow)

val msgFuture = source.runWith(Sink.head)

Expand All @@ -117,7 +117,7 @@ class KafkaClientIntegrationSpec extends TestKit(ActorSystem("KafkaClientSpec"))
val cfg = ConfigFactory.parseMap(Map("ats.messaging.kafka.skipJsonErrors" -> false).asJava).withFallback(system.settings.config)

val flow = Flow[KafkaSpecMessage0].mapAsync(1)((_: KafkaSpecMessage0) => FastFuture.successful(Done))
val source = KafkaClient.committableSource[KafkaSpecMessage0](cfg, commiterSettings, "kafka-test", flow)
val source = KafkaClient.committableSource[KafkaSpecMessage0](cfg, commiterSettings, "kafka-test", groupInstanceId = None, flow)

for {
_ <- akka.pattern.after(3.seconds)(Future.successful(()))
Expand All @@ -143,7 +143,7 @@ class KafkaClientIntegrationSpec extends TestKit(ActorSystem("KafkaClientSpec"))
val cfg = system.settings.config

val flow = Flow[KafkaSpecMessage4].mapAsync(1)((_: KafkaSpecMessage4) => FastFuture.successful(Done))
val source = KafkaClient.committableSource[KafkaSpecMessage4](cfg, commiterSettings, "kafka-test", flow)
val source = KafkaClient.committableSource[KafkaSpecMessage4](cfg, commiterSettings, "kafka-test", groupInstanceId = None, flow)

val testMsg = KafkaSpecMessage4(Instant.now.toString)

Expand Down

0 comments on commit a84f8be

Please sign in to comment.