Skip to content

Commit

Permalink
Expose a way to force unpublished event to be published
Browse files Browse the repository at this point in the history
  • Loading branch information
larousso committed Aug 1, 2024
1 parent 94277b0 commit dfb0296
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ public CompletionStage<Tuple0> publish(List<EventEnvelope<E, Meta, Context>> eve
return _this.publish(events);
}

@Override
public <TxCtx> CompletionStage<Tuple0> publishNonAcknowledgedFromDb(EventStore<TxCtx, E, Meta, Context> eventStore, ConcurrentReplayStrategy concurrentReplayStrategy) {
return CompletionStages.completedStage(Tuple.empty());
}

@Override
public void close() throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,32 +79,20 @@ public KafkaEventPublisher(ActorSystem system, ProducerSettings<String, EventEnv
this.eventsSource = pair.second();
}

@Override
public <TxCtx> CompletionStage<Tuple0> publishNonAcknowledgedFromDb(EventStore<TxCtx, E, Meta, Context> eventStore, ConcurrentReplayStrategy concurrentReplayStrategy) {
return republishFromDBSource(eventStore, concurrentReplayStrategy).runWith(Sink.ignore(), materializer)
.thenApply(d -> Tuple.empty());
}

@Override
public <TxCtx> void start(EventStore<TxCtx, E, Meta, Context> eventStore, ConcurrentReplayStrategy concurrentReplayStrategy) {
killSwitch = RestartSource
.onFailuresWithBackoff(
RestartSettings.create(restartInterval, maxRestartInterval, 0),
() -> {
LOGGER.info("Starting/Restarting publishing event to kafka on topic {}", topic);
return Source.completionStage(eventStore.openTransaction().toCompletableFuture())
.flatMapConcat(tx -> {

LOGGER.info("Replaying not published in DB for {}", topic);
ConcurrentReplayStrategy strategy = Objects.isNull(concurrentReplayStrategy) ? WAIT : concurrentReplayStrategy;
return Source.fromPublisher(eventStore.loadEventsUnpublished(tx, strategy))
.via(publishToKafka(eventStore, Option.some(tx), groupFlow))
.alsoTo(logProgress(100))
.watchTermination((nu, cs) ->
cs.whenComplete((d, e) -> {
eventStore.commitOrRollback(Option.of(e), tx);
if (e != null) {
LOGGER.error("Error replaying non published events to kafka for "+topic, e);
} else {
LOGGER.info("Replaying events not published in DB is finished for {}", topic);
}
})
);
})
return republishFromDBSource(eventStore, concurrentReplayStrategy)
.concat(
this.eventsSource.via(publishToKafka(
eventStore,
Expand All @@ -131,6 +119,28 @@ public <TxCtx> void start(EventStore<TxCtx, E, Meta, Context> eventStore, Concur
.run(materializer).first();
}

private <TxCtx> Source<EventEnvelope<E, Meta, Context>, NotUsed> republishFromDBSource(EventStore<TxCtx, E, Meta, Context> eventStore, ConcurrentReplayStrategy concurrentReplayStrategy) {
return Source.completionStage(eventStore.openTransaction().toCompletableFuture())
.flatMapConcat(tx -> {

LOGGER.info("Replaying not published in DB for {}", topic);
ConcurrentReplayStrategy strategy = Objects.isNull(concurrentReplayStrategy) ? WAIT : concurrentReplayStrategy;
return Source.fromPublisher(eventStore.loadEventsUnpublished(tx, strategy))
.via(publishToKafka(eventStore, Option.some(tx), groupFlow))
.alsoTo(logProgress(100))
.watchTermination((nu, cs) ->
cs.whenComplete((d, e) -> {
eventStore.commitOrRollback(Option.of(e), tx);
if (e != null) {
LOGGER.error("Error replaying non published events to kafka for " + topic, e);
} else {
LOGGER.info("Replaying events not published in DB is finished for {}", topic);
}
})
);
});
}


private <TxCtx> Flow<EventEnvelope<E, Meta, Context>, EventEnvelope<E, Meta, Context>, NotUsed> publishToKafka(EventStore<TxCtx, E, Meta, Context> eventStore, Option<TxCtx> tx, Flow<ProducerMessage.Results<String, EventEnvelope<E, Meta, Context>, EventEnvelope<E, Meta, Context>>, List<ProducerMessage.Results<String, EventEnvelope<E, Meta, Context>, EventEnvelope<E, Meta, Context>>>, NotUsed> groupFlow) {
Flow<ProducerMessage.Envelope<String, EventEnvelope<E, Meta, Context>, EventEnvelope<E, Meta, Context>>, ProducerMessage.Results<String, EventEnvelope<E, Meta, Context>, EventEnvelope<E, Meta, Context>>, NotUsed> publishToKafkaFlow = Producer.<String, EventEnvelope<E, Meta, Context>, EventEnvelope<E, Meta, Context>>flexiFlow(producerSettings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ public CompletionStage<Tuple0> publish(List<EventEnvelope<E, Meta, Context>> eve
return _this.publish(events);
}

@Override
public <TxCtx> CompletionStage<Tuple0> publishNonAcknowledgedFromDb(EventStore<TxCtx, E, Meta, Context> eventStore, ConcurrentReplayStrategy concurrentReplayStrategy) {
return CompletionStages.completedStage(Tuple.empty());
}

@Override
public void close() throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;

Expand All @@ -44,10 +43,7 @@ public class ReactorKafkaEventPublisher<E extends Event, Meta, Context> implemen
private final String topic;
private final Integer queueBufferSize;

private final AtomicReference<Sinks.Many<EventEnvelope<E, Meta, Context>>> queue = new AtomicReference<>();
// private final Sinks.Many<EventEnvelope<E, Meta, Context>> queue;
private final AtomicReference<Flux<EventEnvelope<E, Meta, Context>>> eventSource = new AtomicReference<>();
// private final Flux<EventEnvelope<E, Meta, Context>> eventSource;
private final Sinks.Many<EventEnvelope<E, Meta, Context>> queue;
private final SenderOptions<String, EventEnvelope<E, Meta, Context>> senderOptions;
private final Duration restartInterval;
private final Duration maxRestartInterval;
Expand All @@ -68,23 +64,11 @@ public ReactorKafkaEventPublisher(SenderOptions<String, EventEnvelope<E, Meta, C
this.restartInterval = restartInterval == null ? Duration.of(1, ChronoUnit.SECONDS) : restartInterval;
this.maxRestartInterval = maxRestartInterval == null ? Duration.of(1, ChronoUnit.MINUTES) : maxRestartInterval;



// this.queue = Sinks.many().multicast().onBackpressureBuffer(queueBufferSize1, true); //replay().limit(queueBufferSize1); // .multicast().onBackpressureBuffer(queueBufferSize1);
// this.eventSource = queue.asFlux();
reinitQueue();
this.queue = Sinks.many().multicast().onBackpressureBuffer(this.queueBufferSize, false);
this.senderOptions = senderOptions.stopOnError(true);
this.kafkaSender = KafkaSender.create(senderOptions);
}

private void reinitQueue() {
if (this.queue.get() != null) {
this.queue.get().tryEmitComplete();
}
this.queue.set(Sinks.many().unicast().onBackpressureBuffer()); //replay().limit(queueBufferSize1); // .multicast().onBackpressureBuffer(queueBufferSize1);
this.eventSource.set(queue.get().asFlux());
}

record CountAndMaxSeqNum(Long count, Long lastSeqNum) {
static CountAndMaxSeqNum empty() {
return new CountAndMaxSeqNum(0L, 0L);
Expand All @@ -110,7 +94,44 @@ public <TxCtx> void start(EventStore<TxCtx, E, Meta, Context> eventStore, Concur

Sinks.Many<EventEnvelope<E, Meta, Context>> logProgressSink = Sinks.many().unicast().onBackpressureBuffer();
logProgress(logProgressSink.asFlux(), 100).subscribe();
killSwitch = Mono.defer(() -> fromCS(eventStore::openTransaction)
killSwitch = publishFromDb(eventStore, concurrentReplayStrategy, logProgressSink)
.concatMap(countAndLastSeqNum -> {
// Flux.defer(() -> {
LOGGER.debug("Starting consuming in memory queue for {}. Event lower than {} are ignored", topic, countAndLastSeqNum.lastSeqNum);
return queue.asFlux()
.filter(e -> e.sequenceNum > countAndLastSeqNum.lastSeqNum)
.transform(publishToKafka(
eventStore,
Option.none(),
bufferTimeout(200, Duration.ofMillis(20)),
bufferTimeout(200, Duration.ofSeconds(1))
));
})
.doOnError(e -> {
LOGGER.error("Error publishing events to kafka", e);
})
.retryWhen(Retry.backoff(Long.MAX_VALUE, restartInterval)
.transientErrors(true)
.maxBackoff(maxRestartInterval)
.doBeforeRetry(ctx -> {
LOGGER.error("Error handling events for topic %s retrying for the %s time".formatted(topic, ctx.totalRetries() + 1), ctx.failure());
})
)
.subscribe();
}

@Override
public <TxCtx> CompletionStage<Tuple0> publishNonAcknowledgedFromDb(EventStore<TxCtx, E, Meta, Context> eventStore, ConcurrentReplayStrategy concurrentReplayStrategy) {
Sinks.Many<EventEnvelope<E, Meta, Context>> logProgressSink = Sinks.many().unicast().onBackpressureBuffer();
logProgress(logProgressSink.asFlux(), 100).subscribe();
return publishFromDb(eventStore, concurrentReplayStrategy, logProgressSink)
.collectList()
.map(any -> Tuple.empty())
.toFuture();
}

private <TxCtx> Flux<CountAndMaxSeqNum> publishFromDb(EventStore<TxCtx, E, Meta, Context> eventStore, ConcurrentReplayStrategy concurrentReplayStrategy, Sinks.Many<EventEnvelope<E, Meta, Context>> logProgressSink) {
return Mono.defer(() -> fromCS(eventStore::openTransaction)
.flatMap(tx -> {
LOGGER.info("Replaying events not published from DB in topic {}", topic);
ConcurrentReplayStrategy strategy = Objects.isNull(concurrentReplayStrategy) ? WAIT : concurrentReplayStrategy;
Expand All @@ -136,31 +157,7 @@ public <TxCtx> void start(EventStore<TxCtx, E, Meta, Context> eventStore, Concur
}
});
}))
.flux()
.concatMap(countAndLastSeqNum -> {
// Flux.defer(() -> {
LOGGER.debug("Starting consuming in memory queue for {}. Event lower than {} are ignored", topic, countAndLastSeqNum.lastSeqNum);
return eventSource.get()
.filter(e -> e.sequenceNum > countAndLastSeqNum.lastSeqNum)
.transform(publishToKafka(
eventStore,
Option.none(),
bufferTimeout(200, Duration.ofMillis(20)),
bufferTimeout(200, Duration.ofSeconds(1))
));
})
.doOnError(e -> {
reinitQueue();
LOGGER.error("Error publishing events to kafka", e);
})
.retryWhen(Retry.backoff(Long.MAX_VALUE, restartInterval)
.transientErrors(true)
.maxBackoff(maxRestartInterval)
.doBeforeRetry(ctx -> {
LOGGER.error("Error handling events for topic %s retrying for the %s time".formatted(topic, ctx.totalRetries() + 1), ctx.failure());
})
)
.subscribe();
.flux();
}

private <TxCtx> Function<Flux<EventEnvelope<E, Meta, Context>>, Flux<EventEnvelope<E, Meta, Context>>> publishToKafka(EventStore<TxCtx, E, Meta, Context> eventStore,
Expand Down Expand Up @@ -197,7 +194,7 @@ public CompletionStage<Tuple0> publish(List<EventEnvelope<E, Meta, Context>> eve
return Flux
.fromIterable(events)
.map(t -> {
queue.get().tryEmitNext(t).orThrow();
queue.tryEmitNext(t).orThrow();
return Tuple.empty();
})
.retryWhen(Retry.fixedDelay(50, Duration.ofMillis(1))
Expand Down Expand Up @@ -248,10 +245,4 @@ private <Any> Flux<Integer> logProgress(Flux<Any> logProgress, int every) {
}
});
}

public Integer getBufferedElementCount() {
// return this.queue.scan(Scannable.Attr.BUFFERED);
return 0;
}

}
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package fr.maif.eventsourcing;

import fr.maif.concurrent.CompletionStages;
import io.vavr.Tuple0;
import io.vavr.collection.List;
import io.vavr.concurrent.Future;

import java.io.Closeable;
import java.util.concurrent.CompletionStage;

public interface EventPublisher<E extends Event, Meta, Context> extends Closeable {
CompletionStage<Tuple0> publish(List<EventEnvelope<E, Meta, Context>> events);

default <TxCtx> void start(EventStore<TxCtx, E, Meta, Context> eventStore, EventStore.ConcurrentReplayStrategy concurrentReplayStrategy) {
}
<TxCtx> CompletionStage<Tuple0> publishNonAcknowledgedFromDb(EventStore<TxCtx, E, Meta, Context> eventStore, EventStore.ConcurrentReplayStrategy concurrentReplayStrategy);

default <TxCtx> void start(EventStore<TxCtx, E, Meta, Context> eventStore, EventStore.ConcurrentReplayStrategy concurrentReplayStrategy) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
import io.vavr.control.Option;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import scala.util.hashing.MurmurHash3$;

import java.util.concurrent.atomic.LongAccumulator;
import java.util.function.Function;


public interface ReactorEventStore<TxCtx, E extends Event, Meta, Context> {
Expand Down Expand Up @@ -51,6 +55,26 @@ default Mono<List<EventEnvelope<E, Meta, Context>>> markAsPublished(List<EventEn

Mono<Tuple0> commitOrRollback(Option<Throwable> of, TxCtx tx);

/**
* Stream elements from journal and execute an handling function concurrently.
* The function shard by entity id, so event for the same entity won't be handled concurrently.
*
* @param fromSequenceNum sequence num to start with
* @param parallelism concurrent factor
* @param maxEventsToHandle limit to n events
* @param handle the handling fonction for example to build a new projection
* @return the last sequence num handled
*/
default Mono<Long> concurrentReplay(Long fromSequenceNum, Integer parallelism, Option<Integer> maxEventsToHandle, Function<Flux<EventEnvelope<E, Meta, Context>>, Mono<Tuple0>> handle) {
LongAccumulator lastSeqNum = new LongAccumulator(Long::max, 0);
EventStore.Query.Builder tmpQuery = EventStore.Query.builder().withSequenceFrom(fromSequenceNum);
return this.loadEventsByQuery(maxEventsToHandle.fold(() -> tmpQuery, tmpQuery::withSize).build())
.groupBy(evt -> MurmurHash3$.MODULE$.stringHash(evt.entityId) % parallelism)
.flatMap(flux -> handle.apply(flux.doOnNext(evt -> lastSeqNum.accumulate(evt.sequenceNum))), parallelism)
.last()
.map(any -> lastSeqNum.get());
}

EventStore<TxCtx, E, Meta, Context> toEventStore();

static <TxCtx, E extends Event, Meta, Context> ReactorEventStore<TxCtx, E, Meta, Context> fromEventStore(EventStore<TxCtx, E, Meta, Context> eventStore) {
Expand Down

0 comments on commit dfb0296

Please sign in to comment.