From b3ba402d8231106d3b32d0a4d8c85ba1126660c1 Mon Sep 17 00:00:00 2001 From: Simao Mata Date: Mon, 9 Oct 2023 16:42:55 +0100 Subject: [PATCH] Add api to set `group.instance.id` --- .../src/main/resources/reference.conf | 1 + .../libats/messaging/MessageBus.scala | 4 +- .../libats/messaging/kafka/KafkaClient.scala | 89 ++++++++++++------- .../kakfa/KafkaClientIntegrationSpec.scala | 8 +- 4 files changed, 62 insertions(+), 40 deletions(-) diff --git a/libats-messaging/src/main/resources/reference.conf b/libats-messaging/src/main/resources/reference.conf index cf1e32fd..481412bd 100644 --- a/libats-messaging/src/main/resources/reference.conf +++ b/libats-messaging/src/main/resources/reference.conf @@ -12,6 +12,7 @@ ats.messaging { host = ${?KAFKA_HOST} committer = ${akka.kafka.committer} + consumer = ${akka.kafka.consumer} } listener { parallelism = 1 diff --git a/libats-messaging/src/main/scala/com/advancedtelematic/libats/messaging/MessageBus.scala b/libats-messaging/src/main/scala/com/advancedtelematic/libats/messaging/MessageBus.scala index 34f8eddd..7cbfba24 100644 --- a/libats-messaging/src/main/scala/com/advancedtelematic/libats/messaging/MessageBus.scala +++ b/libats-messaging/src/main/scala/com/advancedtelematic/libats/messaging/MessageBus.scala @@ -63,7 +63,7 @@ object MessageBus { case "kafka" => log.info("Starting messaging mode: Kafka") log.info(s"Using stream name: ${messageLike.streamName}") - KafkaClient.source(system, config, groupId)(messageLike) + KafkaClient.source(config, groupId, groupInstanceId = None)(messageLike) case "local" | "test" => log.info("Using local event bus") LocalMessageBus.subscribe(system, config, op) @@ -83,7 +83,7 @@ object MessageBus { val processingFlow = Flow[T].mapAsync[Any](listenerParallelism)(op) val committerSettings = CommitterSettings(config.getConfig("ats.messaging.kafka.committer")) - KafkaClient.committableSource[T](config, committerSettings, groupIdPrefix, processingFlow).mapMaterializedValue(_ => NotUsed) + KafkaClient.committableSource[T](config, committerSettings, groupIdPrefix, groupInstanceId = None, processingFlow).mapMaterializedValue(_ => NotUsed) case "local" | "test" => log.info("Using local event bus") diff --git a/libats-messaging/src/main/scala/com/advancedtelematic/libats/messaging/kafka/KafkaClient.scala b/libats-messaging/src/main/scala/com/advancedtelematic/libats/messaging/kafka/KafkaClient.scala index b509262a..1a8789e3 100644 --- a/libats-messaging/src/main/scala/com/advancedtelematic/libats/messaging/kafka/KafkaClient.scala +++ b/libats-messaging/src/main/scala/com/advancedtelematic/libats/messaging/kafka/KafkaClient.scala @@ -11,17 +11,18 @@ import akka.event.Logging import akka.kafka.ConsumerMessage.CommittableMessage import akka.kafka.scaladsl.Consumer.Control import akka.kafka.scaladsl.{Committer, Consumer} -import akka.kafka._ +import akka.kafka.* import akka.stream.scaladsl.{Flow, Sink, Source} import com.advancedtelematic.libats.messaging.MessageBusPublisher import com.advancedtelematic.libats.messaging.metrics.KafkaMetrics import com.advancedtelematic.libats.messaging_datatype.MessageLike import com.typesafe.config.Config -import io.circe.syntax._ +import io.circe.syntax.* import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata} -import org.apache.kafka.common.serialization._ +import org.apache.kafka.common.serialization.* +import java.time.temporal.ChronoUnit import scala.concurrent.{ExecutionContext, Future, Promise} object KafkaClient { @@ -31,45 +32,55 @@ object KafkaClient { val kafkaProducer = producer(config)(system) new MessageBusPublisher { - override def publish[T](msg: T)(implicit ex: ExecutionContext, messageLike: MessageLike[T]): Future[Unit] = { + override def publish[T](msg: T)( + implicit ex: ExecutionContext, + messageLike: MessageLike[T]): Future[Unit] = { val promise = Promise[RecordMetadata]() val topic = topicNameFn(messageLike.streamName) - val record = new ProducerRecord[Array[Byte], String](topic, - messageLike.id(msg).getBytes, msg.asJson(messageLike.encoder).noSpaces) - - kafkaProducer.send(record, new Callback { - override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = { - if (exception != null) - promise.failure(exception) - else if (metadata != null) - promise.success(metadata) - else - promise.failure(new Exception("Unknown error occurred, no metadata or error received")) + val record = new ProducerRecord[Array[Byte], String]( + topic, + messageLike.id(msg).getBytes, + msg.asJson(messageLike.encoder).noSpaces) + + //noinspection ConvertExpressionToSAM + kafkaProducer.send( + record, + new Callback { + override def onCompletion(metadata: RecordMetadata, + exception: Exception): Unit = { + if (exception != null) + promise.failure(exception) + else if (metadata != null) + promise.success(metadata) + else + promise.failure( + new Exception( + "Unknown error occurred, no metadata or error received")) + } } - }) + ) promise.future.map(_ => ()) } } } + def source[T](config: Config, groupId: String, groupInstanceId: Option[String])( + implicit ml: MessageLike[T]): Source[T, NotUsed] = + plainSource(config, groupId, groupInstanceId)(ml).mapMaterializedValue(_ => NotUsed) - def source[T](system: ActorSystem, config: Config, groupId: String) - (implicit ml: MessageLike[T]): Source[T, NotUsed] = - plainSource(config, groupId)(ml, system).mapMaterializedValue(_ => NotUsed) - - private def plainSource[T](config: Config, groupIdPrefix: String) - (implicit ml: MessageLike[T], system: ActorSystem): Source[T, Control] = { - val (consumerSettings, subscriptions) = buildSource(config, groupIdPrefix) + private def plainSource[T](config: Config, groupIdPrefix: String, groupInstanceId: Option[String]) + (implicit ml: MessageLike[T]): Source[T, Control] = { + val (consumerSettings, subscriptions) = buildSource(config, groupIdPrefix, groupInstanceId) val settings = consumerSettings.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") Consumer.plainSource(settings, subscriptions).map(_.value()).filter(_ != null) } - def committableSource[T](config: Config, committerSettings: CommitterSettings, groupIdPrefix: String, processingFlow: Flow[T, Any, NotUsed]) + def committableSource[T](config: Config, committerSettings: CommitterSettings, groupIdPrefix: String, groupInstanceId: Option[String], processingFlow: Flow[T, Any, NotUsed]) (implicit ml: MessageLike[T], system: ActorSystem): Source[T, Control] = { - val (cfgSettings, subscriptions) = buildSource(config, groupIdPrefix) + val (cfgSettings, subscriptions) = buildSource(config, groupIdPrefix, groupInstanceId) val log = Logging.getLogger(system, this.getClass) val committerSink = Committer.sink(committerSettings) @@ -78,33 +89,42 @@ object KafkaClient { .filter(_.record.value() != null) .map { msg => log.debug(s"Parsed ${msg.record.value()}") ; msg } .alsoTo { - Flow[CommittableMessage[_, T]] + Flow[CommittableMessage[?, T]] .map(_.record.value()) .via(processingFlow) .to(Sink.ignore) } .alsoTo { - Flow[CommittableMessage[_, _]] + Flow[CommittableMessage[?, T]] .map(_.committableOffset) .to(committerSink) } .map(_.record.value()) } - private def buildSource[T, M](config: Config, groupIdPrefix: String) - (implicit system: ActorSystem, ml: MessageLike[M]): (ConsumerSettings[Array[Byte], M], Subscription) = { + // groupInstanceId is used as kafka-client's group.instance.id and makes this consumer a dynamic consumer vs. static + // See kafka docs + private def buildSource[M](config: Config, groupIdPrefix: String, groupInstanceId: Option[String] = None) + (implicit ml: MessageLike[M]): (ConsumerSettings[Array[Byte], M], Subscription) = { val topicFn = topic(config) val consumerSettings = { val host = config.getString("ats.messaging.kafka.host") val groupId = groupIdPrefix + "-" + topicFn(ml.streamName) - val skipJsonErrors = config.getBoolean("ats.messaging.kafka.skipJsonErrors") + val skipJsonErrors = + config.getBoolean("ats.messaging.kafka.skipJsonErrors") + + val consumerConfig = config.getConfig("ats.messaging.kafka.consumer") - ConsumerSettings(system, new ByteArrayDeserializer, new JsonDeserializer(ml.decoder, throwException = ! skipJsonErrors)) + ConsumerSettings(consumerConfig, + new ByteArrayDeserializer, + new JsonDeserializer(ml.decoder, + throwException = !skipJsonErrors)) .withBootstrapServers(host) .withGroupId(groupId) .withClientId(s"consumer-$groupId") .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") .withProperty("metric.reporters", classOf[KafkaMetrics].getName) + .withGroupInstanceId(groupInstanceId.orNull) } val subscription = Subscriptions.topics(topicFn(ml.streamName)) @@ -114,11 +134,12 @@ object KafkaClient { private[this] def topic(config: Config): String => String = { val suffix = config.getString("ats.messaging.kafka.topicSuffix") - (streamName: String) => streamName + "-" + suffix + (streamName: String) => + streamName + "-" + suffix } - private[this] def producer(config: Config) - (implicit system: ActorSystem): KafkaProducer[Array[Byte], String] = + private[this] def producer(config: Config)( + implicit system: ActorSystem): KafkaProducer[Array[Byte], String] = ProducerSettings.createKafkaProducer( ProducerSettings(system, new ByteArraySerializer, new StringSerializer) .withBootstrapServers(config.getString("ats.messaging.kafka.host")) diff --git a/libats-messaging/src/test/scala/com/advancedtelematic/libats/messaging/kakfa/KafkaClientIntegrationSpec.scala b/libats-messaging/src/test/scala/com/advancedtelematic/libats/messaging/kakfa/KafkaClientIntegrationSpec.scala index 0597ef96..f9549259 100644 --- a/libats-messaging/src/test/scala/com/advancedtelematic/libats/messaging/kakfa/KafkaClientIntegrationSpec.scala +++ b/libats-messaging/src/test/scala/com/advancedtelematic/libats/messaging/kakfa/KafkaClientIntegrationSpec.scala @@ -70,7 +70,7 @@ class KafkaClientIntegrationSpec extends TestKit(ActorSystem("KafkaClientSpec")) val testMsg = KafkaSpecMessage2(Instant.now.toString) val flow = Flow[KafkaSpecMessage2].mapAsync(1)((_: KafkaSpecMessage2) => FastFuture.successful(Done)) - val source = KafkaClient.committableSource[KafkaSpecMessage2](system.settings.config, commiterSettings, "kafka-test", flow) + val source = KafkaClient.committableSource[KafkaSpecMessage2](system.settings.config, commiterSettings, "kafka-test", groupInstanceId = None, flow) val msgFuture = source.groupedWithin(10, 5.seconds).runWith(Sink.head) for { @@ -90,7 +90,7 @@ class KafkaClientIntegrationSpec extends TestKit(ActorSystem("KafkaClientSpec")) """.stripMargin).withFallback(system.settings.config) val flow = Flow[KafkaSpecMessage3].mapAsync(1)((_: KafkaSpecMessage3) => FastFuture.successful(Done)) - val source = KafkaClient.committableSource[KafkaSpecMessage3](cfg, commiterSettings, "kafka-test", flow) + val source = KafkaClient.committableSource[KafkaSpecMessage3](cfg, commiterSettings, "kafka-test", groupInstanceId = None, flow) val msgFuture = source.runWith(Sink.head) @@ -117,7 +117,7 @@ class KafkaClientIntegrationSpec extends TestKit(ActorSystem("KafkaClientSpec")) val cfg = ConfigFactory.parseMap(Map("ats.messaging.kafka.skipJsonErrors" -> false).asJava).withFallback(system.settings.config) val flow = Flow[KafkaSpecMessage0].mapAsync(1)((_: KafkaSpecMessage0) => FastFuture.successful(Done)) - val source = KafkaClient.committableSource[KafkaSpecMessage0](cfg, commiterSettings, "kafka-test", flow) + val source = KafkaClient.committableSource[KafkaSpecMessage0](cfg, commiterSettings, "kafka-test", groupInstanceId = None, flow) for { _ <- akka.pattern.after(3.seconds)(Future.successful(())) @@ -143,7 +143,7 @@ class KafkaClientIntegrationSpec extends TestKit(ActorSystem("KafkaClientSpec")) val cfg = system.settings.config val flow = Flow[KafkaSpecMessage4].mapAsync(1)((_: KafkaSpecMessage4) => FastFuture.successful(Done)) - val source = KafkaClient.committableSource[KafkaSpecMessage4](cfg, commiterSettings, "kafka-test", flow) + val source = KafkaClient.committableSource[KafkaSpecMessage4](cfg, commiterSettings, "kafka-test", groupInstanceId = None, flow) val testMsg = KafkaSpecMessage4(Instant.now.toString)