diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index 8d55494..f70d1b9 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -12,10 +12,10 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - - name: Set up JDK 8 + - name: Set up JDK 17 uses: actions/setup-java@v3 with: - java-version: 8 + java-version: 17 distribution: 'adopt' - name: Maven version diff --git a/.sdkmanrc b/.sdkmanrc old mode 100644 new mode 100755 index 95cd27d..fdfdc92 --- a/.sdkmanrc +++ b/.sdkmanrc @@ -1,3 +1,3 @@ # Enable auto-env through the sdkman_auto_env config # Add key=value pairs of SDKs to use below -java=8.0.345-tem +java=17.0.0-tem diff --git a/README.md b/README.md index 1e814dc..dc7034e 100644 --- a/README.md +++ b/README.md @@ -47,13 +47,12 @@ Créer le producer : kafka-console-producer.sh --broker-list kafka:9092 --topic demo.user --property "parse.key=true" --property "key.separator=|" ``` -Copier les lignes **(inclure la dernière ligne vide)** : +Copier les lignes : ``` 112233|{"phoneNumber":"112233", "firstName":"Hubert", "lastName":"Bonisseur de la Bath"} 998877|{"phoneNumber":"998877", "firstName":"Jean", "lastName":"Soudajman"} 446655|{"phoneNumber":"446655", "firstName":"Henri", "lastName":"Tathan"} - ``` ### 2️⃣ Ecriture automatique dans le topic `demo.sms` diff --git a/pom.xml b/pom.xml old mode 100644 new mode 100755 index 5e3cb58..66b5ecf --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.boot spring-boot-starter-parent - 2.7.4 + 3.0.4 nc.opt diff --git a/src/main/docker/kafka.yml b/src/main/docker/kafka.yml index f2d7c1a..34e24a8 100644 --- a/src/main/docker/kafka.yml +++ b/src/main/docker/kafka.yml @@ -3,20 +3,26 @@ version: "2" services: zookeeper: image: docker.io/bitnami/zookeeper:3.8 + container_name: zookeeper ports: - "2181:2181" volumes: - - "/home/michele/docker_volumes/kafka/zookeeper_data:/bitnami" + - "/home/user/data/docker_volumes/kafka/zookeeper_data:/bitnami" environment: - ALLOW_ANONYMOUS_LOGIN=yes kafka: - image: docker.io/bitnami/kafka:3.2 + image: docker.io/bitnami/kafka:3.3 + container_name: kafka ports: - "9092:9092" - - "9093:9093" volumes: - - "/home/michele/docker_volumes/kafka/kafka_data:/bitnami" + - "/home/user/data/docker_volumes/kafka/kafka_data:/bitnami" environment: + - KAFKA_BROKER_ID=1 + - KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092,OUTSIDE://127.0.0.1:9094 + - KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka:9092,OUTSIDE://127.0.0.1:9094 + - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT + - KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - ALLOW_PLAINTEXT_LISTENER=yes depends_on: @@ -24,9 +30,10 @@ services: red_panda: image: docker.redpanda.com/vectorized/console:latest + container_name: red_panda restart: on-failure ports: - - "8080:8080" + - "8081:8080" environment: KAFKA_BROKERS: kafka:9092 depends_on: diff --git a/src/main/java/nc/opt/springkafka/service/KafkaService.java b/src/main/java/nc/opt/springkafka/service/KafkaService.java old mode 100644 new mode 100755 index e5a3601..1016ecd --- a/src/main/java/nc/opt/springkafka/service/KafkaService.java +++ b/src/main/java/nc/opt/springkafka/service/KafkaService.java @@ -5,6 +5,7 @@ import nc.opt.springkafka.dto.MessageDTO; import nc.opt.springkafka.dto.SmsDTO; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; @@ -15,6 +16,8 @@ import org.springframework.util.concurrent.ListenableFutureCallback; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; @Service public class KafkaService { @@ -38,43 +41,39 @@ public KafkaService(KafkaTemplate kafkaTemplate) { // Envoi synchrone du message // Le producer attend la réponse de Kafka public SendResult push(MessageDTO messageDTO) { + String json = ""; try { - String json = objectMapper.writeValueAsString(messageDTO); + json = objectMapper.writeValueAsString(messageDTO); String uuid = UUID.randomUUID().toString(); - ProducerRecord record = new ProducerRecord<>(messageTopic, uuid, json); + final ProducerRecord record = new ProducerRecord<>(messageTopic, uuid, json); return kafkaTemplate.send(record).get(); - } catch (Exception e) { - log.error("Erreur lors de l'envoi dans kafka de [{}]", messageDTO); + } catch (ExecutionException e) { + handleFailure(json, e.getCause()); + } + catch (TimeoutException | InterruptedException e) { + handleFailure(json, e.getCause()); + } catch (JsonProcessingException e) { throw new RuntimeException(e); } + return null; } // Envoi asynchrone du message public void pushAsync(MessageDTO messageDTO) { - String json = null; try { - json = objectMapper.writeValueAsString(messageDTO); - + final String json = objectMapper.writeValueAsString(messageDTO); String uuid = UUID.randomUUID().toString(); - ListenableFuture> future = - kafkaTemplate.send(messageTopic, json); - - String finalJson = json; - future.addCallback(new ListenableFutureCallback>() { - - @Override - public void onSuccess(SendResult result) { - System.out.println("Sent message=[" + finalJson + - "] with offset=[" + result.getRecordMetadata().offset() + "]"); + CompletableFuture> future = kafkaTemplate.send(messageTopic, uuid, json); + future.whenComplete((result, ex) -> { + if (ex == null) { + handleSuccess(json, result); } - @Override - public void onFailure(Throwable ex) { - System.out.println("Unable to send message=[" - + finalJson + "] due to : " + ex.getMessage()); + else { + handleFailure(json, ex.getCause()); } }); @@ -83,17 +82,27 @@ public void onFailure(Throwable ex) { } } + private void handleSuccess(String data, SendResult result){ + System.out.println("Sent message=[" + data + + "] with offset=[" + result.getRecordMetadata().offset() + "]"); + } + + private void handleFailure(String data, Throwable ex){ + System.out.println("Unable to send message=[" + + data + "] due to : " + ex.getMessage()); + } + public SendResult pushSms(SmsDTO smsDTO) { - try { + try { String json = objectMapper.writeValueAsString(smsDTO); String key = smsDTO.getPhoneNumberEmitter(); ProducerRecord record = new ProducerRecord<>(smsTopic, key, json); return kafkaTemplate.send(record).get(); - } catch (Exception e) { + } catch (Exception e) { log.error("Erreur lors de l'envoi de SMS dans kafka de [{}]", smsDTO); throw new RuntimeException(e); - } + } } } diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index f3bb517..ffa139b 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -5,6 +5,7 @@ # This configuration overrides the application.yml file. # =================================================================== + logging: level: ROOT: INFO @@ -16,3 +17,5 @@ spring: streams: bootstrap-servers: localhost:9092 state-dir: ./state-dir/kafka +server: + port: 8081 \ No newline at end of file diff --git a/src/test/java/nc/opt/springkafka/KafkaServiceTest.java b/src/test/java/nc/opt/springkafka/KafkaServiceTest.java index ae66585..a49196f 100644 --- a/src/test/java/nc/opt/springkafka/KafkaServiceTest.java +++ b/src/test/java/nc/opt/springkafka/KafkaServiceTest.java @@ -1,13 +1,13 @@ package nc.opt.springkafka; -import nc.opt.springkafka.service.KafkaService; import nc.opt.springkafka.dto.MessageDTO; +import nc.opt.springkafka.service.KafkaService; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; +import org.junit.Assert; import org.junit.jupiter.api.Test; -import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; @@ -18,9 +18,9 @@ import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.junit4.SpringRunner; -import org.junit.Assert; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.Map; import static org.hamcrest.MatcherAssert.assertThat; @@ -64,7 +64,7 @@ public void push() { String key = result.getProducerRecord().key(); embeddedKafka.consumeFromEmbeddedTopics(consumer, messageTopic); - final ConsumerRecord record = getSingleRecord(consumer, messageTopic, 10_000); + final ConsumerRecord record = getSingleRecord(consumer, messageTopic, Duration.of(10_000, ChronoUnit.MILLIS)); assertThat(record, hasValue(value)); assertThat(record, hasKey(key));