Skip to content

Commit

Permalink
Merge pull request #2211 from smallrye/revert-2209-fix-synchronizatio…
Browse files Browse the repository at this point in the history
…n-in-internal-channel-registry

Revert "Fix the synchronization protocol in InternalChannelRegistry"
  • Loading branch information
cescoffier authored Jul 4, 2023
2 parents d7e5758 + 241397f commit 31375e6
Showing 1 changed file with 19 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import static io.smallrye.reactive.messaging.providers.i18n.ProviderMessages.msg;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Flow;
import java.util.stream.Collectors;

Expand All @@ -18,13 +17,13 @@
@ApplicationScoped
public class InternalChannelRegistry implements ChannelRegistry {

private final Map<String, List<Flow.Publisher<? extends Message<?>>>> publishers = new ConcurrentHashMap<>();
private final Map<String, List<Flow.Subscriber<? extends Message<?>>>> subscribers = new ConcurrentHashMap<>();
private final Map<String, List<Flow.Publisher<? extends Message<?>>>> publishers = new HashMap<>();
private final Map<String, List<Flow.Subscriber<? extends Message<?>>>> subscribers = new HashMap<>();

private final Map<String, Boolean> outgoing = new ConcurrentHashMap<>();
private final Map<String, Boolean> incoming = new ConcurrentHashMap<>();
private final Map<String, Boolean> outgoing = new HashMap<>();
private final Map<String, Boolean> incoming = new HashMap<>();

private final Map<Class<?>, Map<String, Object>> emitters = new ConcurrentHashMap<>();
private final Map<Class<?>, Map<String, Object>> emitters = new HashMap<>();

@Override
public Flow.Publisher<? extends Message<?>> register(String name,
Expand All @@ -37,7 +36,7 @@ public Flow.Publisher<? extends Message<?>> register(String name,
}

@Override
public Flow.Subscriber<? extends Message<?>> register(String name,
public synchronized Flow.Subscriber<? extends Message<?>> register(String name,
Flow.Subscriber<? extends Message<?>> subscriber, boolean merge) {
Objects.requireNonNull(name, msg.nameMustBeSet());
Objects.requireNonNull(subscriber, msg.subscriberMustBeSet());
Expand All @@ -47,48 +46,47 @@ public Flow.Subscriber<? extends Message<?>> register(String name,
}

@Override
public void register(String name, Emitter<?> emitter) {
public synchronized void register(String name, Emitter<?> emitter) {
Objects.requireNonNull(name, msg.nameMustBeSet());
Objects.requireNonNull(emitter, msg.emitterMustBeSet());
register(name, Emitter.class, emitter);
}

@Override
public void register(String name, MutinyEmitter<?> emitter) {
public synchronized void register(String name, MutinyEmitter<?> emitter) {
Objects.requireNonNull(name, msg.nameMustBeSet());
Objects.requireNonNull(emitter, msg.emitterMustBeSet());
register(name, MutinyEmitter.class, emitter);
}

@Override
public <T> void register(String name, Class<T> emitterType, T emitter) {
public synchronized <T> void register(String name, Class<T> emitterType, T emitter) {
Objects.requireNonNull(name, msg.nameMustBeSet());
Objects.requireNonNull(emitter, msg.emitterMustBeSet());
Map<String, Object> map = emitters.computeIfAbsent(emitterType, key -> new HashMap<>());
map.put(name, emitter);
}

@Override
public List<Flow.Publisher<? extends Message<?>>> getPublishers(String name) {
public synchronized List<Flow.Publisher<? extends Message<?>>> getPublishers(String name) {
Objects.requireNonNull(name, msg.nameMustBeSet());
return publishers.getOrDefault(name, Collections.emptyList());
}

@Override
public Emitter<?> getEmitter(String name) {
public synchronized Emitter<?> getEmitter(String name) {
Objects.requireNonNull(name, msg.nameMustBeSet());
return getEmitter(name, Emitter.class);
}

@Override
public MutinyEmitter<?> getMutinyEmitter(String name) {
public synchronized MutinyEmitter<?> getMutinyEmitter(String name) {
Objects.requireNonNull(name, msg.nameMustBeSet());
return getEmitter(name, MutinyEmitter.class);
}

@SuppressWarnings("unchecked")
@Override
public <T> T getEmitter(String name, Class<? super T> emitterType) {
public synchronized <T> T getEmitter(String name, Class<? super T> emitterType) {
Objects.requireNonNull(name, msg.nameMustBeSet());
Map<String, Object> typedEmitters = emitters.get(emitterType);
if (typedEmitters == null) {
Expand All @@ -99,7 +97,7 @@ public <T> T getEmitter(String name, Class<? super T> emitterType) {
}

@Override
public List<Flow.Subscriber<? extends Message<?>>> getSubscribers(String name) {
public synchronized List<Flow.Subscriber<? extends Message<?>>> getSubscribers(String name) {
Objects.requireNonNull(name, msg.nameMustBeSet());
return subscribers.getOrDefault(name, Collections.emptyList());
}
Expand All @@ -110,17 +108,17 @@ private <T> void register(Map<String, List<T>> multimap, String name, T item) {
}

@Override
public Set<String> getIncomingNames() {
return publishers.keySet();
public synchronized Set<String> getIncomingNames() {
return new HashSet<>(publishers.keySet());
}

@Override
public Set<String> getOutgoingNames() {
return subscribers.keySet();
public synchronized Set<String> getOutgoingNames() {
return new HashSet<>(subscribers.keySet());
}

@Override
public Set<String> getEmitterNames() {
public synchronized Set<String> getEmitterNames() {
return emitters.values().stream().flatMap(m -> m.keySet().stream()).collect(Collectors.toSet());
}

Expand Down

0 comments on commit 31375e6

Please sign in to comment.