Skip to content

Commit

Permalink
make ThreadTimer not singleton again
Browse files Browse the repository at this point in the history
This is for multi-deployment environments, like WildFly, where
multiple applications may exist and each will have its own timer.
  • Loading branch information
Ladicek committed Sep 19, 2024
1 parent 2393d05 commit a9491db
Show file tree
Hide file tree
Showing 15 changed files with 80 additions and 57 deletions.
3 changes: 2 additions & 1 deletion doc/modules/ROOT/pages/reference/metrics.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ The behavior of the timer thread can be observed through the following metrics:
| Type | `Gauge<Integer>`
| Unit | None
| Description | The number of tasks that are currently scheduled (for future execution) on the timer.
| Tags | None
| Tags
a| * `id` - the ID of the timer, to distinguish multiple timers in a multi-application environment
|===

== Micrometer Support
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;

import io.smallrye.faulttolerance.core.util.RunnableWrapper;
Expand All @@ -18,10 +19,10 @@
* Starts one thread that processes submitted tasks in a loop and when it's time for a task to run,
* it gets submitted to the executor. The default executor is provided by a caller, so the caller
* must shut down this timer <em>before</em> shutting down the executor.
* <p>
* At most one timer may exist.
*/
public final class ThreadTimer implements Timer {
private static final AtomicInteger COUNTER = new AtomicInteger(0);

private static final Comparator<Task> TASK_COMPARATOR = (o1, o2) -> {
// two different instances are never equal
if (o1 == o2) {
Expand All @@ -40,7 +41,7 @@ public final class ThreadTimer implements Timer {
return System.identityHashCode(o1) < System.identityHashCode(o2) ? -1 : 1;
};

private static volatile ThreadTimer INSTANCE;
private final int id;

private final SortedSet<Task> tasks = new ConcurrentSkipListSet<>(TASK_COMPARATOR);

Expand All @@ -57,17 +58,9 @@ public final class ThreadTimer implements Timer {
* @param defaultExecutor default {@link Executor} used for running scheduled tasks, unless an executor
* is provided when {@linkplain #schedule(long, Runnable, Executor) scheduling} a task
*/
public static synchronized ThreadTimer create(Executor defaultExecutor) {
ThreadTimer instance = INSTANCE;
if (instance == null) {
instance = new ThreadTimer(defaultExecutor);
INSTANCE = instance;
return instance;
}
throw new IllegalStateException("Timer already exists");
}
public ThreadTimer(Executor defaultExecutor) {
this.id = COUNTER.incrementAndGet();

private ThreadTimer(Executor defaultExecutor) {
this.defaultExecutor = checkNotNull(defaultExecutor, "Executor must be set");

this.thread = new Thread(() -> {
Expand Down Expand Up @@ -112,10 +105,15 @@ private ThreadTimer(Executor defaultExecutor) {
LOG.unexpectedExceptionInTimerLoop(e);
}
}
}, "SmallRye Fault Tolerance Timer");
}, "SmallRye Fault Tolerance Timer " + id);
thread.start();

LOG.createdTimer();
LOG.createdTimer(id);
}

@Override
public int getId() {
return id;
}

@Override
Expand Down Expand Up @@ -144,17 +142,13 @@ public int countScheduledTasks() {
@Override
public void shutdown() throws InterruptedException {
if (running.compareAndSet(true, false)) {
try {
LOG.shutdownTimer();
thread.interrupt();
thread.join();
} finally {
INSTANCE = null;
}
LOG.shutdownTimer(id);
thread.interrupt();
thread.join();
}
}

private static class Task implements TimerTask, Runnable {
private class Task implements TimerTask, Runnable {
// scheduled: present in the `tasks` queue
// running: not present in the `tasks` queue && `runnable != null`
// finished or cancelled: not present in the `tasks` queue && `runnable == null`
Expand All @@ -169,29 +163,22 @@ private static class Task implements TimerTask, Runnable {

@Override
public boolean isDone() {
ThreadTimer timer = INSTANCE;
if (timer != null) {
boolean queued = timer.tasks.contains(this);
if (queued) {
return false;
} else {
return runnable == null;
}
boolean queued = tasks.contains(this);
if (queued) {
return false;
} else {
return runnable == null;
}
return true; // ?
}

@Override
public boolean cancel() {
ThreadTimer timer = INSTANCE;
if (timer != null) {
// can't cancel if it's already running
boolean removed = timer.tasks.remove(this);
if (removed) {
runnable = null;
LOG.cancelledTimerTask(this);
return true;
}
// can't cancel if it's already running
boolean removed = tasks.remove(this);
if (removed) {
runnable = null;
LOG.cancelledTimerTask(this);
return true;
}
return false;
}
Expand All @@ -211,7 +198,7 @@ public void run() {
}
}

private static final class TaskWithExecutor extends Task {
private final class TaskWithExecutor extends Task {
private final Executor executor;

TaskWithExecutor(long startTime, Runnable runnable, Executor executor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@
* {@link #schedule(long, Runnable, Executor)} are executed on the given executor.
*/
public interface Timer {
/**
* Returns the ID of this timer. Timers are guaranteed to have a unique name.
*
* @return the ID of this timer
*/
int getId();

/**
* Schedules the {@code task} to be executed in {@code delayInMillis} on this timer's
* default {@link Executor}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
interface TimerLogger extends BasicLogger {
TimerLogger LOG = Logger.getMessageLogger(TimerLogger.class, TimerLogger.class.getPackage().getName());

@Message(id = NONE, value = "Timer created")
@Message(id = NONE, value = "Timer %s created")
@LogMessage(level = Logger.Level.TRACE)
void createdTimer();
void createdTimer(int id);

@Message(id = NONE, value = "Timer shut down")
@Message(id = NONE, value = "Timer %s shut down")
@LogMessage(level = Logger.Level.TRACE)
void shutdownTimer();
void shutdownTimer(int id);

@Message(id = NONE, value = "Scheduled timer task %s to run in %s millis")
@LogMessage(level = Logger.Level.TRACE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void setUp() {
executor = Executors.newSingleThreadExecutor();

timerExecutor = Executors.newSingleThreadExecutor();
timer = ThreadTimer.create(timerExecutor);
timer = new ThreadTimer(timerExecutor);
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ boolean timerTaskCancelled() {
return timerTaskCancelled.get();
}

@Override
public int getId() {
return 0;
}

@Override
public TimerTask schedule(long delayInMillis, Runnable task) {
if (alreadyUsed.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
public class TestTimer implements Timer {
private final Queue<Task> tasks = new ConcurrentLinkedQueue<>();

@Override
public int getId() {
return 0;
}

@Override
public TimerTask schedule(long delayInMillis, Runnable runnable) {
Task task = new Task(runnable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class ThreadTimerStressTest {
@BeforeEach
public void setUp() throws InterruptedException {
executor = Executors.newFixedThreadPool(POOL_SIZE);
timer = ThreadTimer.create(executor);
timer = new ThreadTimer(executor);

// precreate all threads in the pool
// if we didn't do this, the first few iterations would be dominated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class ThreadTimerTest {
@BeforeEach
public void setUp() {
executor = Executors.newSingleThreadExecutor();
timer = ThreadTimer.create(executor);
timer = new ThreadTimer(executor);
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class ExecutorHolder {
public ExecutorHolder(AsyncExecutorProvider asyncExecutorProvider) {
this.asyncExecutor = asyncExecutorProvider.get();
this.eventLoop = EventLoop.get();
this.timer = ThreadTimer.create(asyncExecutor);
this.timer = new ThreadTimer(asyncExecutor);
this.shouldShutdownAsyncExecutor = asyncExecutorProvider instanceof DefaultAsyncExecutorProvider;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.eclipse.microprofile.metrics.Metadata;
import org.eclipse.microprofile.metrics.MetricRegistry;
import org.eclipse.microprofile.metrics.MetricUnits;
import org.eclipse.microprofile.metrics.Tag;
import org.eclipse.microprofile.metrics.annotation.RegistryType;

import io.smallrye.faulttolerance.ExecutorHolder;
Expand Down Expand Up @@ -42,7 +43,8 @@ void init() {
.withName(MetricsConstants.TIMER_SCHEDULED)
.withUnit(MetricUnits.NONE)
.build();
registry.gauge(metadata, executorHolder.getTimer(), Timer::countScheduledTasks);
Timer timer = executorHolder.getTimer();
registry.gauge(metadata, timer, Timer::countScheduledTasks, new Tag("id", "" + timer.getId()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.smallrye.faulttolerance.metrics;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand All @@ -10,6 +11,7 @@
import org.eclipse.microprofile.config.inject.ConfigProperty;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.smallrye.faulttolerance.ExecutorHolder;
import io.smallrye.faulttolerance.core.metrics.MeteredOperation;
import io.smallrye.faulttolerance.core.metrics.MetricsConstants;
Expand All @@ -34,7 +36,9 @@ public class MicrometerProvider implements MetricsProvider {

@PostConstruct
void init() {
registry.gauge(MetricsConstants.TIMER_SCHEDULED, executorHolder.getTimer(), Timer::countScheduledTasks);
Timer timer = executorHolder.getTimer();
registry.gauge(MetricsConstants.TIMER_SCHEDULED, Collections.singletonList(Tag.of("id", "" + timer.getId())),
timer, Timer::countScheduledTasks);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ final class LazyDependencies implements BuilderLazyDependencies {
this.executor = config.executor();
this.metricsAdapter = config.metricsAdapter();
this.eventLoop = EventLoop.get();
this.timer = ThreadTimer.create(executor);
this.timer = new ThreadTimer(executor);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package io.smallrye.faulttolerance.standalone;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.smallrye.faulttolerance.core.metrics.MeteredOperation;
import io.smallrye.faulttolerance.core.metrics.MetricsConstants;
import io.smallrye.faulttolerance.core.metrics.MetricsProvider;
Expand All @@ -19,7 +21,8 @@ public MicrometerAdapter(MeterRegistry registry) {
}

MetricsProvider createMetricsProvider(Timer timer) {
registry.gauge(MetricsConstants.TIMER_SCHEDULED, timer, Timer::countScheduledTasks);
registry.gauge(MetricsConstants.TIMER_SCHEDULED, Collections.singletonList(Tag.of("id", "" + timer.getId())),
timer, Timer::countScheduledTasks);

return new MetricsProvider() {
private final Map<Object, MetricsRecorder> cache = new ConcurrentHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
import static org.awaitility.Awaitility.await;

import java.time.temporal.ChronoUnit;
import java.util.SortedMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

import org.eclipse.microprofile.metrics.Gauge;
import org.eclipse.microprofile.metrics.MetricID;
import org.eclipse.microprofile.metrics.MetricRegistry;
import org.eclipse.microprofile.metrics.annotation.RegistryType;
Expand Down Expand Up @@ -43,15 +45,15 @@ public void test(@RegistryType(type = MetricRegistry.Type.BASE) MetricRegistry m
assertThat(future).isNotCompleted();

await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(metrics.getGauge(new MetricID(MetricsConstants.TIMER_SCHEDULED)).getValue()).isEqualTo(1);
assertThat(findTimerGauge(metrics).getValue()).isEqualTo(1);
});

barrier.open();

assertThat(future).succeedsWithin(2, TimeUnit.SECONDS)
.isEqualTo("hello");

assertThat(metrics.getGauge(new MetricID(MetricsConstants.TIMER_SCHEDULED)).getValue()).isEqualTo(0);
assertThat(findTimerGauge(metrics).getValue()).isEqualTo(0);
}

public CompletionStage<String> action() throws InterruptedException {
Expand All @@ -62,4 +64,11 @@ public CompletionStage<String> action() throws InterruptedException {
public CompletionStage<String> fallback() {
return CompletableFuture.completedFuture("fallback");
}

private static Gauge<?> findTimerGauge(MetricRegistry metrics) {
SortedMap<MetricID, Gauge> timers = metrics.getGauges(
(id, metric) -> id.getName().equals(MetricsConstants.TIMER_SCHEDULED));
assertThat(timers).hasSize(1);
return timers.values().iterator().next();
}
}

0 comments on commit a9491db

Please sign in to comment.