diff --git a/subprojects/parseq-guava-interop/src/main/java/com/linkedin/parseq/guava/ListenableFutureUtil.java b/subprojects/parseq-guava-interop/src/main/java/com/linkedin/parseq/guava/ListenableFutureUtil.java index db13131b..620dde66 100644 --- a/subprojects/parseq-guava-interop/src/main/java/com/linkedin/parseq/guava/ListenableFutureUtil.java +++ b/subprojects/parseq-guava-interop/src/main/java/com/linkedin/parseq/guava/ListenableFutureUtil.java @@ -1,9 +1,5 @@ package com.linkedin.parseq.guava; -import com.linkedin.parseq.promise.Promises; -import java.util.Collection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.AbstractFuture; import com.google.common.util.concurrent.ListenableFuture; @@ -15,6 +11,8 @@ import com.linkedin.parseq.promise.SettablePromise; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -29,38 +27,11 @@ private ListenableFutureUtil() { } public static Task fromListenableFuture(ListenableFuture future) { - - /** - * BaseTask's promise will be listening to this - * also see {@link BaseTask#contextRun(Context, Task, Collection)} - */ - final SettablePromise promise = Promises.settable(); - // Setup cancellation propagation from Task -> ListenableFuture. - final Task task = - new BaseTask("fromListenableFuture: " + Task._taskDescriptor.getDescription(future.getClass().getName())) { - @Override - public boolean cancel(Exception rootReason) { - // .cancel()'s result indicates whether cancel() successfully trigger state transition to "CANCELLED" - // And we should only cancel GRPC future when the transition was conducted. - boolean shouldCancelTask = super.cancel(rootReason); - if (shouldCancelTask && !future.isCancelled()) { - boolean futureCancelResult = future.cancel(true); - if (!futureCancelResult) { - LOGGER.warn("Unexpected: GRPC future was not cancelled but new attempt to cancel also failed."); - } - } - return shouldCancelTask; - } - - @Override - protected Promise run(Context context) throws Throwable { - return promise; - } - }; - + final ListenableFutureBridgedTask task = new ListenableFutureBridgedTask(future); // Setup forward event propagation ListenableFuture -> Task. + SettablePromise promise = task.getSettableDelegate(); Runnable callbackRunnable = () -> { if (promise.isDone()) { boolean isPromiseFailed = promise.isFailed(); @@ -143,4 +114,44 @@ public boolean setException(Throwable throwable) { return super.setException(throwable); } } + + @VisibleForTesting + static class ListenableFutureBridgedTask extends BaseTask { + + private final ListenableFuture _future; + + ListenableFutureBridgedTask(ListenableFuture future) { + super("fromListenableFuture: " + Task._taskDescriptor.getDescription(future.getClass().getName())); + _future = future; + } + + @Override + public boolean cancel(Exception rootReason) { + // .cancel()'s result indicates whether cancel() successfully trigger state transition to "CANCELLED" + // And we should only cancel GRPC future when the transition was conducted. + boolean shouldCancelTask = super.cancel(rootReason); + if (shouldCancelTask && !_future.isCancelled()) { + boolean futureCancelResult = _future.cancel(true); + if (!futureCancelResult) { + LOGGER.warn("Unexpected: GRPC future was not cancelled but new attempt to cancel also failed."); + } + } + return shouldCancelTask; + } + + @Override + protected Promise run(Context context) throws Throwable { + return getDelegate(); + } + + /** + * {@inheritDoc} + * + *

Exposed as public for access from {@link ListenableFutureUtil#fromListenableFuture(ListenableFuture)}

+ */ + @Override + public SettablePromise getSettableDelegate() { + return super.getSettableDelegate(); + } + } } diff --git a/subprojects/parseq-guava-interop/src/test/java/com/linkedin/parseq/guava/ListenableFutureUtilTest.java b/subprojects/parseq-guava-interop/src/test/java/com/linkedin/parseq/guava/ListenableFutureUtilTest.java index 11171a98..c5df58f9 100644 --- a/subprojects/parseq-guava-interop/src/test/java/com/linkedin/parseq/guava/ListenableFutureUtilTest.java +++ b/subprojects/parseq-guava-interop/src/test/java/com/linkedin/parseq/guava/ListenableFutureUtilTest.java @@ -62,6 +62,13 @@ public void testFromListenableFuture() throws Exception { Assert.assertTrue(task.isDone()); Assert.assertTrue(task.isFailed()); Assert.assertEquals(task.getError().getCause().getClass(), CancellationException.class); + + listenableFuture = new ListenableFutureUtil.SettableFuture<>(); + task = ListenableFutureUtil.fromListenableFuture(listenableFuture); + + // Test get. + listenableFuture.set("Haha"); + Assert.assertEquals(task.get(), "Haha"); } @Test