Skip to content

Commit

Permalink
Merge pull request #43 from opt-nc/upgrade_kafka_sb
Browse files Browse the repository at this point in the history
feat(upgrade): kakfa, spring-boot
  • Loading branch information
mbarre committed Mar 21, 2023
2 parents 7af1486 + 29a4004 commit d4f17d1
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 40 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up JDK 8
- name: Set up JDK 17
uses: actions/setup-java@v3
with:
java-version: 8
java-version: 17
distribution: 'adopt'

- name: Maven version
Expand Down
2 changes: 1 addition & 1 deletion .sdkmanrc
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# Enable auto-env through the sdkman_auto_env config
# Add key=value pairs of SDKs to use below
java=8.0.345-tem
java=17.0.0-tem
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,12 @@ Créer le producer :
kafka-console-producer.sh --broker-list kafka:9092 --topic demo.user --property "parse.key=true" --property "key.separator=|"
```

Copier les lignes **(inclure la dernière ligne vide)** :
Copier les lignes :

```
112233|{"phoneNumber":"112233", "firstName":"Hubert", "lastName":"Bonisseur de la Bath"}
998877|{"phoneNumber":"998877", "firstName":"Jean", "lastName":"Soudajman"}
446655|{"phoneNumber":"446655", "firstName":"Henri", "lastName":"Tathan"}
```

### 2️⃣ Ecriture automatique dans le topic `demo.sms`
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.4</version>
<version>3.0.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>nc.opt</groupId>
Expand Down
17 changes: 12 additions & 5 deletions src/main/docker/kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,37 @@ version: "2"
services:
zookeeper:
image: docker.io/bitnami/zookeeper:3.8
container_name: zookeeper
ports:
- "2181:2181"
volumes:
- "/home/michele/docker_volumes/kafka/zookeeper_data:/bitnami"
- "/home/user/data/docker_volumes/kafka/zookeeper_data:/bitnami"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: docker.io/bitnami/kafka:3.2
image: docker.io/bitnami/kafka:3.3
container_name: kafka
ports:
- "9092:9092"
- "9093:9093"
volumes:
- "/home/michele/docker_volumes/kafka/kafka_data:/bitnami"
- "/home/user/data/docker_volumes/kafka/kafka_data:/bitnami"
environment:
- KAFKA_BROKER_ID=1
- KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092,OUTSIDE://127.0.0.1:9094
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka:9092,OUTSIDE://127.0.0.1:9094
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper

red_panda:
image: docker.redpanda.com/vectorized/console:latest
container_name: red_panda
restart: on-failure
ports:
- "8080:8080"
- "8081:8080"
environment:
KAFKA_BROKERS: kafka:9092
depends_on:
Expand Down
57 changes: 33 additions & 24 deletions src/main/java/nc/opt/springkafka/service/KafkaService.java
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import nc.opt.springkafka.dto.MessageDTO;
import nc.opt.springkafka.dto.SmsDTO;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
Expand All @@ -15,6 +16,8 @@
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@Service
public class KafkaService {
Expand All @@ -38,43 +41,39 @@ public KafkaService(KafkaTemplate<String, String> kafkaTemplate) {
// Envoi synchrone du message
// Le producer attend la réponse de Kafka
public SendResult<String, String> push(MessageDTO messageDTO) {
String json = "";
try {
String json = objectMapper.writeValueAsString(messageDTO);
json = objectMapper.writeValueAsString(messageDTO);
String uuid = UUID.randomUUID().toString();

ProducerRecord<String, String> record = new ProducerRecord<>(messageTopic, uuid, json);
final ProducerRecord<String, String> record = new ProducerRecord<>(messageTopic, uuid, json);
return kafkaTemplate.send(record).get();

} catch (Exception e) {
log.error("Erreur lors de l'envoi dans kafka de [{}]", messageDTO);
} catch (ExecutionException e) {
handleFailure(json, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(json, e.getCause());
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return null;
}

// Envoi asynchrone du message
public void pushAsync(MessageDTO messageDTO) {

String json = null;
try {
json = objectMapper.writeValueAsString(messageDTO);

final String json = objectMapper.writeValueAsString(messageDTO);
String uuid = UUID.randomUUID().toString();

ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(messageTopic, json);

String finalJson = json;
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("Sent message=[" + finalJson +
"] with offset=[" + result.getRecordMetadata().offset() + "]");
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(messageTopic, uuid, json);
future.whenComplete((result, ex) -> {
if (ex == null) {
handleSuccess(json, result);
}
@Override
public void onFailure(Throwable ex) {
System.out.println("Unable to send message=["
+ finalJson + "] due to : " + ex.getMessage());
else {
handleFailure(json, ex.getCause());
}
});

Expand All @@ -83,17 +82,27 @@ public void onFailure(Throwable ex) {
}
}

private void handleSuccess(String data, SendResult<String, String> result){
System.out.println("Sent message=[" + data +
"] with offset=[" + result.getRecordMetadata().offset() + "]");
}

private void handleFailure(String data, Throwable ex){
System.out.println("Unable to send message=["
+ data + "] due to : " + ex.getMessage());
}

public SendResult<String, String> pushSms(SmsDTO smsDTO) {
try {
try {
String json = objectMapper.writeValueAsString(smsDTO);
String key = smsDTO.getPhoneNumberEmitter();

ProducerRecord<String, String> record = new ProducerRecord<>(smsTopic, key, json);
return kafkaTemplate.send(record).get();

} catch (Exception e) {
} catch (Exception e) {
log.error("Erreur lors de l'envoi de SMS dans kafka de [{}]", smsDTO);
throw new RuntimeException(e);
}
}
}
}
3 changes: 3 additions & 0 deletions src/main/resources/application-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# This configuration overrides the application.yml file.
# ===================================================================


logging:
level:
ROOT: INFO
Expand All @@ -16,3 +17,5 @@ spring:
streams:
bootstrap-servers: localhost:9092
state-dir: ./state-dir/kafka
server:
port: 8081
10 changes: 5 additions & 5 deletions src/test/java/nc/opt/springkafka/KafkaServiceTest.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package nc.opt.springkafka;

import nc.opt.springkafka.service.KafkaService;
import nc.opt.springkafka.dto.MessageDTO;
import nc.opt.springkafka.service.KafkaService;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
Expand All @@ -18,9 +18,9 @@
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
import org.junit.Assert;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Map;

import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -64,7 +64,7 @@ public void push() {
String key = result.getProducerRecord().key();

embeddedKafka.consumeFromEmbeddedTopics(consumer, messageTopic);
final ConsumerRecord<String, String> record = getSingleRecord(consumer, messageTopic, 10_000);
final ConsumerRecord<String, String> record = getSingleRecord(consumer, messageTopic, Duration.of(10_000, ChronoUnit.MILLIS));

assertThat(record, hasValue(value));
assertThat(record, hasKey(key));
Expand Down

0 comments on commit d4f17d1

Please sign in to comment.