diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java b/spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java index 4e6f7498a1..7b37673302 100644 --- a/spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java +++ b/spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java @@ -18,44 +18,48 @@ package org.springframework.util.concurrent; import org.springframework.util.Assert; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.atomic.AtomicReference; /** - * A {@link ListenableFuture} whose value can be set by the {@link #set(Object)} or + * A {@link org.springframework.util.concurrent.ListenableFuture ListenableFuture} + * whose value can be set via {@link #set(Object)} or * {@link #setException(Throwable)}. It may also be cancelled. * *

Inspired by {@code com.google.common.util.concurrent.SettableFuture}. * * @author Mattias Severson + * @author Rossen Stoyanchev * @since 4.1 */ public class SettableListenableFuture implements ListenableFuture { - private final SettableFuture settableFuture = new SettableFuture(); - private final ListenableFutureCallbackRegistry registry = new ListenableFutureCallbackRegistry(); + private final SettableTask settableTask; + private final ListenableFutureTask listenableFuture; + + + public SettableListenableFuture() { + this.settableTask = new SettableTask(); + this.listenableFuture = new ListenableFutureTask(this.settableTask); + } /** * Set the value of this future. This method will return {@code true} if * the value was set successfully, or {@code false} if the future has already * been set or cancelled. - * * @param value the value that will be set. * @return {@code true} if the value was successfully set, else {@code false}. */ public boolean set(T value) { - boolean setValue = this.settableFuture.setValue(value); - if (setValue) { - this.registry.success(value); + boolean success = this.settableTask.setValue(value); + if (success) { + this.listenableFuture.run(); } - return setValue; + return success; } /** @@ -66,66 +70,66 @@ public class SettableListenableFuture implements ListenableFuture { * @return {@code true} if the exception was successfully set, else {@code false}. */ public boolean setException(Throwable exception) { - Assert.notNull(exception, "exception must not be null"); - boolean setException = this.settableFuture.setThrowable(exception); - if (setException) { - this.registry.failure(exception); + Assert.notNull(exception, "'exception' must not be null"); + boolean success = this.settableTask.setValue(exception); + if (success) { + this.listenableFuture.run(); } - return setException; + return success; } - @Override - public void addCallback(ListenableFutureCallback callback) { - this.registry.addCallback(callback); - } + @Override + public void addCallback(ListenableFutureCallback callback) { + this.listenableFuture.addCallback(callback); + } - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - boolean cancelled = this.settableFuture.cancel(mayInterruptIfRunning); + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + this.settableTask.setCancelled(); + boolean cancelled = this.listenableFuture.cancel(mayInterruptIfRunning); if (cancelled && mayInterruptIfRunning) { interruptTask(); } return cancelled; - } + } - @Override - public boolean isCancelled() { - return this.settableFuture.isCancelled(); - } + @Override + public boolean isCancelled() { + return this.listenableFuture.isCancelled(); + } - @Override - public boolean isDone() { - return this.settableFuture.isDone(); - } + @Override + public boolean isDone() { + return this.listenableFuture.isDone(); + } /** * Retrieve the value. - *

Will return the value if it has been set by calling {@link #set(Object)}, throw - * an {@link ExecutionException} if the {@link #setException(Throwable)} has been - * called, throw a {@link CancellationException} if the future has been cancelled, or - * throw an {@link IllegalStateException} if neither a value, nor an exception has - * been set. + *

Will return the value if it has been set via {@link #set(Object)}, + * throw an {@link java.util.concurrent.ExecutionException} if it has been + * set via {@link #setException(Throwable)} or throw a + * {@link java.util.concurrent.CancellationException} if it has been cancelled. * @return The value associated with this future. */ - @Override - public T get() throws InterruptedException, ExecutionException { - return this.settableFuture.get(); - } + @Override + public T get() throws InterruptedException, ExecutionException { + return this.listenableFuture.get(); + } /** * Retrieve the value. - *

Will return the value if it has been by calling {@link #set(Object)}, throw an - * {@link ExecutionException} if the {@link #setException(Throwable)} - * has been called, throw a {@link java.util.concurrent.CancellationException} if the - * future has been cancelled. + *

Will return the value if it has been set via {@link #set(Object)}, + * throw an {@link java.util.concurrent.ExecutionException} if it has been + * set via {@link #setException(Throwable)} or throw a + * {@link java.util.concurrent.CancellationException} if it has been cancelled. * @param timeout the maximum time to wait. * @param unit the time unit of the timeout argument. * @return The value associated with this future. */ @Override - public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - return this.settableFuture.get(timeout, unit); - } + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return this.listenableFuture.get(timeout, unit); + } /** * Subclasses can override this method to implement interruption of the future's @@ -138,143 +142,33 @@ public class SettableListenableFuture implements ListenableFuture { } - /** - * Helper class that keeps track of the state of this future. - * @param The type of value to be set. - */ - private static class SettableFuture implements Future { + private static class SettableTask implements Callable { - private final ReadWriteLock lock = new ReentrantReadWriteLock(true); - private final CountDownLatch latch = new CountDownLatch(1); - private T value; - private Throwable throwable; - private State state = State.INITIALIZED; + private static final String NO_VALUE = SettableListenableFuture.class.getName() + ".NO_VALUE"; + private final AtomicReference value = new AtomicReference(NO_VALUE); - @Override - public T get() throws ExecutionException, InterruptedException { - this.latch.await(); - this.lock.readLock().lock(); - try { - return getValue(); - } - finally { - this.lock.readLock().unlock(); - } - } - - @Override - public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - if (this.latch.await(timeout, unit)) { - this.lock.readLock().lock(); - try { - return getValue(); - } - finally { - this.lock.readLock().unlock(); - } - } - else { - throw new TimeoutException(); - } - } + private volatile boolean cancelled = false; - private T getValue() throws ExecutionException { - switch (this.state) { - case COMPLETED: - if (this.throwable != null) { - throw new ExecutionException(this.throwable); - } - else { - return this.value; - } - case CANCELLED: - throw new CancellationException("Future has been cancelled."); - default: - throw new IllegalStateException("Invalid state: " + this.state); - } - } - @Override - public boolean isDone() { - this.lock.readLock().lock(); - try { - switch (this.state) { - case COMPLETED: - case CANCELLED: - return true; - default: - return false; - } - } - finally { - this.lock.readLock().unlock(); + public boolean setValue(Object value) { + if (this.cancelled) { + return false; } + return this.value.compareAndSet(NO_VALUE, value); } - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - this.lock.writeLock().lock(); - try { - if (this.state.equals(State.INITIALIZED)) { - this.state = State.CANCELLED; - this.latch.countDown(); - return true; - } - } - finally { - this.lock.writeLock().unlock(); - } - return false; + public void setCancelled() { + this.cancelled = true; } @Override - public boolean isCancelled() { - this.lock.readLock().lock(); - try { - return this.state.equals(State.CANCELLED); - } - finally { - this.lock.readLock().unlock(); + public T call() throws Exception { + if (value.get() instanceof Exception) { + throw (Exception) value.get(); } + return (T) value.get(); } - - boolean setValue(T value) { - this.lock.writeLock().lock(); - try { - if (this.state.equals(State.INITIALIZED)) { - this.value = value; - this.state = State.COMPLETED; - this.latch.countDown(); - return true; - } - } - finally { - this.lock.writeLock().unlock(); - } - return false; - } - - Throwable getThrowable() { - return this.throwable; - } - - boolean setThrowable(Throwable throwable) { - this.lock.writeLock().lock(); - try { - if (this.state.equals(State.INITIALIZED)) { - this.throwable = throwable; - this.state = State.COMPLETED; - this.latch.countDown(); - return true; - } - } - finally { - this.lock.writeLock().unlock(); - } - return false; - } - - private enum State {INITIALIZED, COMPLETED, CANCELLED} } + } diff --git a/spring-core/src/test/java/org/springframework/util/concurrent/SettableListenableFutureTests.java b/spring-core/src/test/java/org/springframework/util/concurrent/SettableListenableFutureTests.java index 731bd14226..d16d08a77f 100644 --- a/spring-core/src/test/java/org/springframework/util/concurrent/SettableListenableFutureTests.java +++ b/spring-core/src/test/java/org/springframework/util/concurrent/SettableListenableFutureTests.java @@ -26,6 +26,8 @@ import java.util.concurrent.TimeoutException; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + /** * @author Mattias Severson @@ -290,36 +292,28 @@ public class SettableListenableFutureTests { @Test public void cancelDoesNotNotifyCallbacksOnSet() { - settableListenableFuture.addCallback(new ListenableFutureCallback() { - @Override - public void onSuccess(String result) { - fail("onSuccess should not have been called"); - } - - @Override - public void onFailure(Throwable t) { - fail("onFailure should not have been called"); - } - }); + ListenableFutureCallback callback = mock(ListenableFutureCallback.class); + settableListenableFuture.addCallback(callback); settableListenableFuture.cancel(true); + + verify(callback).onFailure(any(CancellationException.class)); + verifyNoMoreInteractions(callback); + settableListenableFuture.set("hello"); + verifyNoMoreInteractions(callback); } @Test public void cancelDoesNotNotifyCallbacksOnSetException() { - settableListenableFuture.addCallback(new ListenableFutureCallback() { - @Override - public void onSuccess(String result) { - fail("onSuccess should not have been called"); - } - - @Override - public void onFailure(Throwable t) { - fail("onFailure should not have been called"); - } - }); + ListenableFutureCallback callback = mock(ListenableFutureCallback.class); + settableListenableFuture.addCallback(callback); settableListenableFuture.cancel(true); + + verify(callback).onFailure(any(CancellationException.class)); + verifyNoMoreInteractions(callback); + settableListenableFuture.setException(new RuntimeException()); + verifyNoMoreInteractions(callback); } private static class InterruptableSettableListenableFuture extends SettableListenableFuture {