Skip to content

Commit

Permalink
Fix bridging for get
Browse files Browse the repository at this point in the history
  • Loading branch information
li-kramgopa committed Jun 11, 2024
1 parent 3a580be commit 2de5bcb
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package com.linkedin.parseq.guava;

import com.linkedin.parseq.promise.Promises;
import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -15,6 +11,8 @@
import com.linkedin.parseq.promise.SettablePromise;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand All @@ -29,38 +27,11 @@ private ListenableFutureUtil() {
}

public static <T> Task<T> fromListenableFuture(ListenableFuture<T> future) {

/**
* BaseTask's promise will be listening to this
* also see {@link BaseTask#contextRun(Context, Task, Collection)}
*/
final SettablePromise<T> promise = Promises.settable();

// Setup cancellation propagation from Task -> ListenableFuture.
final Task<T> task =
new BaseTask<T>("fromListenableFuture: " + Task._taskDescriptor.getDescription(future.getClass().getName())) {
@Override
public boolean cancel(Exception rootReason) {
// <BaseTask>.cancel()'s result indicates whether cancel() successfully trigger state transition to "CANCELLED"
// And we should only cancel GRPC future when the transition was conducted.
boolean shouldCancelTask = super.cancel(rootReason);
if (shouldCancelTask && !future.isCancelled()) {
boolean futureCancelResult = future.cancel(true);
if (!futureCancelResult) {
LOGGER.warn("Unexpected: GRPC future was not cancelled but new attempt to cancel also failed.");
}
}
return shouldCancelTask;
}

@Override
protected Promise<? extends T> run(Context context) throws Throwable {
return promise;
}
};

final ListenableFutureBridgedTask<T> task = new ListenableFutureBridgedTask<T>(future);

// Setup forward event propagation ListenableFuture -> Task.
SettablePromise<T> promise = task.getSettableDelegate();
Runnable callbackRunnable = () -> {
if (promise.isDone()) {
boolean isPromiseFailed = promise.isFailed();
Expand Down Expand Up @@ -143,4 +114,44 @@ public boolean setException(Throwable throwable) {
return super.setException(throwable);
}
}

@VisibleForTesting
static class ListenableFutureBridgedTask<T> extends BaseTask<T> {

private final ListenableFuture<T> _future;

ListenableFutureBridgedTask(ListenableFuture<T> future) {
super("fromListenableFuture: " + Task._taskDescriptor.getDescription(future.getClass().getName()));
_future = future;
}

@Override
public boolean cancel(Exception rootReason) {
// <BaseTask>.cancel()'s result indicates whether cancel() successfully trigger state transition to "CANCELLED"
// And we should only cancel GRPC future when the transition was conducted.
boolean shouldCancelTask = super.cancel(rootReason);
if (shouldCancelTask && !_future.isCancelled()) {
boolean futureCancelResult = _future.cancel(true);
if (!futureCancelResult) {
LOGGER.warn("Unexpected: GRPC future was not cancelled but new attempt to cancel also failed.");
}
}
return shouldCancelTask;
}

@Override
protected Promise<? extends T> run(Context context) throws Throwable {
return getDelegate();
}

/**
* {@inheritDoc}
*
* <p>Exposed as public for access from {@link ListenableFutureUtil#fromListenableFuture(ListenableFuture)}</p>
*/
@Override
public SettablePromise<T> getSettableDelegate() {
return super.getSettableDelegate();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ public void testFromListenableFuture() throws Exception {
Assert.assertTrue(task.isDone());
Assert.assertTrue(task.isFailed());
Assert.assertEquals(task.getError().getCause().getClass(), CancellationException.class);

listenableFuture = new ListenableFutureUtil.SettableFuture<>();
task = ListenableFutureUtil.fromListenableFuture(listenableFuture);

// Test get.
listenableFuture.set("Haha");
Assert.assertEquals(task.get(), "Haha");
}

@Test
Expand Down

0 comments on commit 2de5bcb

Please sign in to comment.