Skip to content

Commit

Permalink
+ replaced semaphore with client executor pool
Browse files Browse the repository at this point in the history
  • Loading branch information
q3769 committed Sep 30, 2023
1 parent f5c84d1 commit 92ba84a
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 29 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,9 @@ will have to wait for active ones to run for completion - i.e. the throttling ef

Each individual client can have only one single dedicated executor at any given moment. The executor is backed by a
worker thread pool with maximum size `singleClientMaxConcurrency`. Thus, the client's execution concurrency can never go
beyond, and will always be throttled at `singleClientMaxConcurrency`. The total count of clients that can be serviced in
parallel is then throttled by a semaphore.
beyond, and will always be throttled at `singleClientMaxConcurrency`. The individual executors themselves are then
pooled collectively, at a maximum pool size of `concurrentClientMaxTotal`; this throttles the total number of clients
that can be serviced in parallel.

If both builder parameters are provided, the `Conottle` instance's maximum number of concurrent threads is
the `singleClientMaxConcurrency` multiplied by the `concurrentClientMaxTotal`.
15 changes: 10 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@

<groupId>io.github.q3769</groupId>
<artifactId>conottle</artifactId>
<version>10.1.2</version>
<version>10.1.3</version>
<packaging>jar</packaging>
<name>conottle</name>
<description>A Java concurrent API to throttle the maximum concurrency to process tasks for any given client while
the total number of clients being serviced in parallel can also be throttled
</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.release>8</maven.compiler.release>
</properties>

<url>https://github.com/q3769/conottle</url>
<licenses>
Expand All @@ -60,6 +64,11 @@
</developers>

<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
Expand Down Expand Up @@ -96,10 +105,6 @@
</dependency>
</dependencies>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.release>8</maven.compiler.release>
</properties>

<build>
<plugins>
Expand Down
51 changes: 35 additions & 16 deletions src/main/java/conottle/Conottle.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.ToString;
import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import javax.annotation.concurrent.ThreadSafe;
import java.util.concurrent.*;
Expand All @@ -39,23 +42,30 @@
@ThreadSafe
@ToString
public final class Conottle implements ClientTaskExecutor, AutoCloseable {
private static final ExecutorService ADMIN_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
private static final int DEFAULT_CONCURRENT_CLIENT_MAX_TOTAL =
Math.max(16, Runtime.getRuntime().availableProcessors());
private static final int DEFAULT_SINGLE_CLIENT_MAX_CONCURRENCY =
Math.max(16, Runtime.getRuntime().availableProcessors());
private static final Logger logger = Logger.instance();
private final ConcurrentMap<Object, ThrottlingExecutor> activeThrottlingExecutors;
private final Semaphore clientTotalQuota;
private final int singleClientConcurrency;
private final ExecutorService adminExecutorService = Executors.newSingleThreadExecutor();
private final ConcurrentMap<Object, TaskCountingExecutor> activeThrottlingExecutors;
private final ObjectPool<TaskCountingExecutor> throttlingExecutorPool;

private Conottle(@NonNull Builder builder) {
this.activeThrottlingExecutors = new ConcurrentHashMap<>(builder.concurrentClientMaxTotal);
singleClientConcurrency = builder.singleClientMaxConcurrency;
clientTotalQuota = new Semaphore(builder.concurrentClientMaxTotal);
this.throttlingExecutorPool =
new GenericObjectPool<>(new PooledExecutorFactory(builder.singleClientMaxConcurrency),
getThrottlingExecutorPoolConfig(builder.concurrentClientMaxTotal));
logger.atTrace().log("Success constructing: {}", this);
}

@NonNull
private static GenericObjectPoolConfig<TaskCountingExecutor> getThrottlingExecutorPoolConfig(int poolSizeMaxTotal) {
GenericObjectPoolConfig<TaskCountingExecutor> throttlingExecutorPoolConfig = new GenericObjectPoolConfig<>();
throttlingExecutorPoolConfig.setMaxTotal(poolSizeMaxTotal);
return throttlingExecutorPoolConfig;
}

@Override
@NonNull
public CompletableFuture<Void> execute(@NonNull Runnable command, @NonNull Object clientId) {
Expand All @@ -67,7 +77,7 @@ public CompletableFuture<Void> execute(@NonNull Runnable command, @NonNull Objec
public <V> CompletableFuture<V> submit(@NonNull Callable<V> task, @NonNull Object clientId) {
CompletableFutureHolder<V> taskCompletableFutureHolder = new CompletableFutureHolder<>();
activeThrottlingExecutors.compute(clientId, (k, presentExecutor) -> {
ThrottlingExecutor executor = (presentExecutor == null) ? acquireClientExecutor() : presentExecutor;
TaskCountingExecutor executor = (presentExecutor == null) ? borrowFromPool() : presentExecutor;
taskCompletableFutureHolder.setCompletableFuture(executor.incrementPendingTaskCountAndSubmit(task));
return executor;
});
Expand All @@ -76,31 +86,40 @@ public <V> CompletableFuture<V> submit(@NonNull Callable<V> task, @NonNull Objec
taskCompletableFuture.whenCompleteAsync((r, e) -> activeThrottlingExecutors.computeIfPresent(clientId,
(k, checkedExecutor) -> {
if (checkedExecutor.decrementAndGetPendingTaskCount() == 0) {
clientTotalQuota.release();
checkedExecutor.shutdown();
returnToPool(checkedExecutor);
return null;
}
return checkedExecutor;
}), ADMIN_EXECUTOR_SERVICE);
}), adminExecutorService);
return copy;
}

@Override
public void close() {
activeThrottlingExecutors.values().parallelStream().forEach(ThrottlingExecutor::shutdown);
this.throttlingExecutorPool.close();
}

int countActiveExecutors() {
return activeThrottlingExecutors.size();
}

private ThrottlingExecutor acquireClientExecutor() {
private TaskCountingExecutor borrowFromPool() {
try {
clientTotalQuota.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return throttlingExecutorPool.borrowObject();
} catch (Exception e) {
throw new IllegalStateException("Failed to borrow executor from pool " + throttlingExecutorPool, e);
}
return new ThrottlingExecutor(Executors.newFixedThreadPool(singleClientConcurrency));
}

private void returnToPool(TaskCountingExecutor throttlingExecutor) {
adminExecutorService.submit(() -> {
try {
throttlingExecutorPool.returnObject(throttlingExecutor);
} catch (Exception e) {
logger.atWarn()
.log(e, "Ignoring failure of returning {} to {}", throttlingExecutor, throttlingExecutorPool);
}
});
}

/**
Expand Down
72 changes: 72 additions & 0 deletions src/main/java/conottle/PooledExecutorFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* MIT License
*
* Copyright (c) 2022 Qingtian Wang
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package conottle;

import lombok.NonNull;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.DestroyMode;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;

import java.util.concurrent.Executors;

/**
* Creates pooled {@link TaskCountingExecutor} instances that provide throttled async client task executions. Each
* {@code TaskCountingExecutor} instance throttles its client task concurrency at the max capacity of the executor's
* backing thread pool.
*/
final class PooledExecutorFactory extends BasePooledObjectFactory<TaskCountingExecutor> {
private final int executorThreadPoolCapacity;

/**
* @param executorThreadPoolCapacity
* max concurrent threads of the {@link TaskCountingExecutor} instance produced by this factory
*/
PooledExecutorFactory(int executorThreadPoolCapacity) {
this.executorThreadPoolCapacity = executorThreadPoolCapacity;
}

@Override
@NonNull
public TaskCountingExecutor create() {
return new TaskCountingExecutor(Executors.newFixedThreadPool(executorThreadPoolCapacity));
}

@Override
@NonNull
public PooledObject<TaskCountingExecutor> wrap(TaskCountingExecutor throttlingExecutor) {
return new DefaultPooledObject<>(throttlingExecutor);
}

@Override
public void destroyObject(PooledObject<TaskCountingExecutor> pooledThrottlingExecutor, DestroyMode destroyMode)
throws Exception {
try {
super.destroyObject(pooledThrottlingExecutor, destroyMode);
} finally {
pooledThrottlingExecutor.getObject().shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@
import java.util.concurrent.ExecutorService;

/**
* Not thread safe: Any and all public methods always should be externally synchronized while multithreading.
* Not thread safe: Any and all non-private methods always should be externally synchronized while multithreading.
*/
@NotThreadSafe
@ToString
final class ThrottlingExecutor {
final class TaskCountingExecutor {
/**
* Thread pool that throttles the max concurrency of this executor
*/
Expand All @@ -49,7 +49,7 @@ final class ThrottlingExecutor {
* @param throttlingExecutorService
* the backing thread pool facilitating the async executions of this executor.
*/
public ThrottlingExecutor(ExecutorService throttlingExecutorService) {
TaskCountingExecutor(ExecutorService throttlingExecutorService) {
this.throttlingExecutorService = throttlingExecutorService;
}

Expand All @@ -68,15 +68,14 @@ private static <V> V call(Callable<V> task) {
* task. Within the synchronized context, a return value of zero unambiguously indicates no more in-flight
* task pending execution on this executor.
*/
public int decrementAndGetPendingTaskCount() {
int decrementAndGetPendingTaskCount() {
if (pendingTaskCount <= 0) {
throw new IllegalStateException("Cannot further decrement from pending task count: " + pendingTaskCount);
}
return --pendingTaskCount;
}

@NonNull
public <V> CompletableFuture<V> incrementPendingTaskCountAndSubmit(Callable<V> task) {
@NonNull <V> CompletableFuture<V> incrementPendingTaskCountAndSubmit(Callable<V> task) {
pendingTaskCount++;
return CompletableFuture.supplyAsync(() -> call(task), throttlingExecutorService);
}
Expand Down

0 comments on commit 92ba84a

Please sign in to comment.