Skip to content

Commit

Permalink
Optimisations
Browse files Browse the repository at this point in the history
  • Loading branch information
larousso committed Jul 17, 2024
1 parent d1b12ac commit 0e5cbcc
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public <TxCtx> void start(EventStore<TxCtx, E, Meta, Context> eventStore, Concur
eventStore,
Option.none(),
bufferTimeout(200, Duration.ofMillis(20)),
bufferTimeout(200, Duration.ofMillis(20))
bufferTimeout(200, Duration.ofSeconds(1))
));
})
.doOnError(e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.vavr.Tuple0;
import io.vavr.Tuple3;
import io.vavr.Value;
import io.vavr.collection.HashMap;
import io.vavr.collection.List;
import io.vavr.collection.Map;
import io.vavr.collection.Seq;
Expand Down Expand Up @@ -84,19 +85,33 @@ public CompletionStage<InTransactionResult<List<Either<Error, ProcessingSuccess<
// Extract errors from command handling
List<Either<Error, ProcessingSuccess<S, E, Meta, Context, Message>>> errors = commandsAndResults
.map(Tuple3::_3)
.filter(Either::isLeft)
.map(e -> Either.left(e.swap().get()));

// Extract success and generate envelopes for each result
CompletionStage<List<CommandStateAndEvent>> success = traverse(commandsAndResults.filter(t -> t._3.isRight()), t -> {
C command = t._1;
Option<S> mayBeState = t._2;
List<E> events = t._3.get().events.toList();
return buildEnvelopes(ctx, command, events).thenApply(eventEnvelopes -> {
Option<Long> mayBeLastSeqNum = eventEnvelopes.lastOption().map(evl -> evl.sequenceNum);
return new CommandStateAndEvent(command, mayBeState, eventEnvelopes, events, t._3.get().message, mayBeLastSeqNum);
});
});
.flatMap(Either::swap)
.map(Either::left);

Map<String, C> commandsById = commandsAndResults.flatMap(t -> t._3.map(any -> t._1)).groupBy(c -> c.entityId().get()).mapValues(List::head);
Map<String, Message> messageById = HashMap.ofEntries(commandsAndResults.flatMap(t -> t._3.map(any -> Tuple(t._1.entityId().get(), any.message))));
Map<String, Option<S>> statesById = HashMap.ofEntries(commandsAndResults.flatMap(t -> t._3.map(any -> Tuple(t._1.entityId().get(), t._2))));
List<E> allEvents = commandsAndResults.flatMap(t -> t._3.map(ev -> ev.events)).flatMap(identity());
Map<String, List<E>> eventsById = allEvents.groupBy(Event::entityId);

CompletionStage<List<CommandStateAndEvent>> success = eventStore.nextSequences(ctx, allEvents.size())
.thenApply(sequences ->
buildEnvelopes(ctx, commandsById, sequences, allEvents)
)
.thenApply(allEnvelopes -> {
Map<String, List<EventEnvelope<E, Meta, Context>>> indexed = allEnvelopes.groupBy(env -> env.entityId);
return indexed.map(t -> {
String entityId = t._1;
List<EventEnvelope<E, Meta, Context>> eventEnvelopes = t._2;
C command = commandsById.get(entityId).get();
Option<S> mayBeState = statesById.get(entityId).get();
List<E> events = eventsById.getOrElse(entityId, List.empty());
Option<Long> mayBeLastSeqNum = eventEnvelopes.lastOption().map(evl -> evl.sequenceNum);
Message message = messageById.get(entityId).get();
return new CommandStateAndEvent(command, mayBeState, eventEnvelopes, events, message, mayBeLastSeqNum);
}).toList();

});

return success.thenApply(s -> Tuple(s.toList(), errors));
})
Expand Down Expand Up @@ -172,13 +187,11 @@ public CompletionStage<List<Tuple3<C, Option<S>, Either<Error, Events<E, Message
).thenApply(t -> t._1);
}

CompletionStage<List<EventEnvelope<E, Meta, Context>>> buildEnvelopes(TxCtx tx, C command, List<E> events) {
List<EventEnvelope<E, Meta, Context>> buildEnvelopes(TxCtx tx, Map<String, C> commands, List<Long> sequences, List<E> events) {
String transactionId = transactionManager.transactionId();
int nbMessages = events.length();
return eventStore.nextSequences(tx, events.size()).thenApply(s ->
events.zip(s).zipWithIndex().map(t ->
buildEnvelope(tx, command, t._1._1, t._1._2, t._2, nbMessages, transactionId)
)
return events.zip(sequences).zipWithIndex().map(t ->
buildEnvelope(tx, commands.get(t._1._1.entityId()).get(), t._1._1, t._1._2, t._2, nbMessages, transactionId)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@
import io.vavr.collection.Map;
import io.vavr.collection.Traversable;
import io.vavr.control.Option;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import static java.util.function.Function.identity;

public class DefaultReactorAggregateStore<S extends State<S>, E extends Event, Meta, Context, TxCtx> implements ReactorAggregateStore<S, String, TxCtx> {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultReactorAggregateStore.class);

private final ReactorEventStore<TxCtx, E, Meta, Context> eventStore;
private final EventHandler<S, E> eventEventHandler;
Expand Down Expand Up @@ -44,12 +46,11 @@ public Mono<Map<String, Option<S>>> getAggregates(TxCtx txCtx, List<String> enti
empty,
(Map<String, Option<S>> states, EventEnvelope<E, Meta, Context> event) -> {
Option<S> mayBeCurrentState = states.get(event.entityId).flatMap(identity());
return states.put(
event.entityId,
this.eventEventHandler
.applyEvent(mayBeCurrentState, event.event)
.map((S state) -> (S) state.withSequenceNum(event.sequenceNum))
);
Option<S> newState = this.eventEventHandler
.applyEvent(mayBeCurrentState, event.event)
.map((S state) -> (S) state.withSequenceNum(event.sequenceNum));
LOGGER.debug("Applying {} to {} : \n -> {}", event.event, mayBeCurrentState, newState);
return states.put(event.entityId, newState);
}
);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,26 @@ public Mono<EventEnvelope<E, Meta, Context>> markAsPublished(EventEnvelope<E, Me
return Mono.fromCompletionStage(() -> eventStore.markAsPublished(eventEnvelope));
}

@Override
public Mono<List<EventEnvelope<E, Meta, Context>>> markAsPublished(List<EventEnvelope<E, Meta, Context>> eventEnvelopes) {
return Mono.fromCompletionStage(() -> eventStore.markAsPublished(eventEnvelopes));
}

@Override
public Mono<List<EventEnvelope<E, Meta, Context>>> markAsPublished(TxCtx tx, List<EventEnvelope<E, Meta, Context>> eventEnvelopes) {
return Mono.fromCompletionStage(() -> eventStore.markAsPublished(tx, eventEnvelopes));
}

@Override
public Flux<EventEnvelope<E, Meta, Context>> loadEvents(String id) {
return Flux.from(eventStore.loadEvents(id));
}

@Override
public Flux<EventEnvelope<E, Meta, Context>> loadAllEvents() {
return Flux.from(eventStore.loadAllEvents());
}

@Override
public Mono<TxCtx> openTransaction() {
return Mono.fromCompletionStage(() -> eventStore.openTransaction());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,17 @@ public CompletionStage<List<EventEnvelope<E, Meta, Context>>> markAsPublished(Tx
).thenApply(__ -> eventEnvelopes.map(eventEnvelope -> eventEnvelope.copy().withPublished(true).build()));
}

@Override
public CompletionStage<List<EventEnvelope<E, Meta, Context>>> markAsPublished(List<EventEnvelope<E, Meta, Context>> eventEnvelopes) {
return simpleDb.execute(dsl -> dsl
.update(table(this.tableNames.tableName))
.set(PUBLISHED, true)
.where(ID.in(eventEnvelopes.map(evt -> evt.id).toJavaArray(UUID[]::new)))
).thenApply(__ -> eventEnvelopes.map(eventEnvelope -> eventEnvelope.copy().withPublished(true).build()));
}



@Override
public void close() throws IOException {

Expand Down

0 comments on commit 0e5cbcc

Please sign in to comment.