diff --git a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java index 6e76870d..6f83e504 100644 --- a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java +++ b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java @@ -146,7 +146,7 @@ public void start(EventStore eventStore, Concur eventStore, Option.none(), bufferTimeout(200, Duration.ofMillis(20)), - bufferTimeout(200, Duration.ofMillis(20)) + bufferTimeout(200, Duration.ofSeconds(1)) )); }) .doOnError(e -> { diff --git a/thoth-core/src/main/java/fr/maif/eventsourcing/EventProcessorImpl.java b/thoth-core/src/main/java/fr/maif/eventsourcing/EventProcessorImpl.java index 28cd1b08..8baef21b 100644 --- a/thoth-core/src/main/java/fr/maif/eventsourcing/EventProcessorImpl.java +++ b/thoth-core/src/main/java/fr/maif/eventsourcing/EventProcessorImpl.java @@ -8,6 +8,7 @@ import io.vavr.Tuple0; import io.vavr.Tuple3; import io.vavr.Value; +import io.vavr.collection.HashMap; import io.vavr.collection.List; import io.vavr.collection.Map; import io.vavr.collection.Seq; @@ -84,19 +85,33 @@ public CompletionStage>> errors = commandsAndResults .map(Tuple3::_3) - .filter(Either::isLeft) - .map(e -> Either.left(e.swap().get())); - - // Extract success and generate envelopes for each result - CompletionStage> success = traverse(commandsAndResults.filter(t -> t._3.isRight()), t -> { - C command = t._1; - Option mayBeState = t._2; - List events = t._3.get().events.toList(); - return buildEnvelopes(ctx, command, events).thenApply(eventEnvelopes -> { - Option mayBeLastSeqNum = eventEnvelopes.lastOption().map(evl -> evl.sequenceNum); - return new CommandStateAndEvent(command, mayBeState, eventEnvelopes, events, t._3.get().message, mayBeLastSeqNum); - }); - }); + .flatMap(Either::swap) + .map(Either::left); + + Map commandsById = commandsAndResults.flatMap(t -> t._3.map(any -> t._1)).groupBy(c -> c.entityId().get()).mapValues(List::head); + Map messageById = HashMap.ofEntries(commandsAndResults.flatMap(t -> t._3.map(any -> Tuple(t._1.entityId().get(), any.message)))); + Map> statesById = HashMap.ofEntries(commandsAndResults.flatMap(t -> t._3.map(any -> Tuple(t._1.entityId().get(), t._2)))); + List allEvents = commandsAndResults.flatMap(t -> t._3.map(ev -> ev.events)).flatMap(identity()); + Map> eventsById = allEvents.groupBy(Event::entityId); + + CompletionStage> success = eventStore.nextSequences(ctx, allEvents.size()) + .thenApply(sequences -> + buildEnvelopes(ctx, commandsById, sequences, allEvents) + ) + .thenApply(allEnvelopes -> { + Map>> indexed = allEnvelopes.groupBy(env -> env.entityId); + return indexed.map(t -> { + String entityId = t._1; + List> eventEnvelopes = t._2; + C command = commandsById.get(entityId).get(); + Option mayBeState = statesById.get(entityId).get(); + List events = eventsById.getOrElse(entityId, List.empty()); + Option mayBeLastSeqNum = eventEnvelopes.lastOption().map(evl -> evl.sequenceNum); + Message message = messageById.get(entityId).get(); + return new CommandStateAndEvent(command, mayBeState, eventEnvelopes, events, message, mayBeLastSeqNum); + }).toList(); + + }); return success.thenApply(s -> Tuple(s.toList(), errors)); }) @@ -172,13 +187,11 @@ public CompletionStage, Either t._1); } - CompletionStage>> buildEnvelopes(TxCtx tx, C command, List events) { + List> buildEnvelopes(TxCtx tx, Map commands, List sequences, List events) { String transactionId = transactionManager.transactionId(); int nbMessages = events.length(); - return eventStore.nextSequences(tx, events.size()).thenApply(s -> - events.zip(s).zipWithIndex().map(t -> - buildEnvelope(tx, command, t._1._1, t._1._2, t._2, nbMessages, transactionId) - ) + return events.zip(sequences).zipWithIndex().map(t -> + buildEnvelope(tx, commands.get(t._1._1.entityId()).get(), t._1._1, t._1._2, t._2, nbMessages, transactionId) ); } diff --git a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/DefaultReactorAggregateStore.java b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/DefaultReactorAggregateStore.java index 8bdcff63..af0132b1 100644 --- a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/DefaultReactorAggregateStore.java +++ b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/DefaultReactorAggregateStore.java @@ -7,7 +7,8 @@ import io.vavr.collection.Map; import io.vavr.collection.Traversable; import io.vavr.control.Option; -import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -15,6 +16,7 @@ public class DefaultReactorAggregateStore, E extends Event, Meta, Context, TxCtx> implements ReactorAggregateStore { + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultReactorAggregateStore.class); private final ReactorEventStore eventStore; private final EventHandler eventEventHandler; @@ -44,12 +46,11 @@ public Mono>> getAggregates(TxCtx txCtx, List enti empty, (Map> states, EventEnvelope event) -> { Option mayBeCurrentState = states.get(event.entityId).flatMap(identity()); - return states.put( - event.entityId, - this.eventEventHandler - .applyEvent(mayBeCurrentState, event.event) - .map((S state) -> (S) state.withSequenceNum(event.sequenceNum)) - ); + Option newState = this.eventEventHandler + .applyEvent(mayBeCurrentState, event.event) + .map((S state) -> (S) state.withSequenceNum(event.sequenceNum)); + LOGGER.debug("Applying {} to {} : \n -> {}", event.event, mayBeCurrentState, newState); + return states.put(event.entityId, newState); } ); }); diff --git a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/DefaultReactorEventStore.java b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/DefaultReactorEventStore.java index fe3001aa..a646b01b 100644 --- a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/DefaultReactorEventStore.java +++ b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/DefaultReactorEventStore.java @@ -54,6 +54,26 @@ public Mono> markAsPublished(EventEnvelope eventStore.markAsPublished(eventEnvelope)); } + @Override + public Mono>> markAsPublished(List> eventEnvelopes) { + return Mono.fromCompletionStage(() -> eventStore.markAsPublished(eventEnvelopes)); + } + + @Override + public Mono>> markAsPublished(TxCtx tx, List> eventEnvelopes) { + return Mono.fromCompletionStage(() -> eventStore.markAsPublished(tx, eventEnvelopes)); + } + + @Override + public Flux> loadEvents(String id) { + return Flux.from(eventStore.loadEvents(id)); + } + + @Override + public Flux> loadAllEvents() { + return Flux.from(eventStore.loadAllEvents()); + } + @Override public Mono openTransaction() { return Mono.fromCompletionStage(() -> eventStore.openTransaction()); diff --git a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java index ddd4c6c7..036ea74e 100644 --- a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java +++ b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java @@ -358,6 +358,17 @@ public CompletionStage>> markAsPublished(Tx ).thenApply(__ -> eventEnvelopes.map(eventEnvelope -> eventEnvelope.copy().withPublished(true).build())); } + @Override + public CompletionStage>> markAsPublished(List> eventEnvelopes) { + return simpleDb.execute(dsl -> dsl + .update(table(this.tableNames.tableName)) + .set(PUBLISHED, true) + .where(ID.in(eventEnvelopes.map(evt -> evt.id).toJavaArray(UUID[]::new))) + ).thenApply(__ -> eventEnvelopes.map(eventEnvelope -> eventEnvelope.copy().withPublished(true).build())); + } + + + @Override public void close() throws IOException {