Make AbstractResponseBodySubscriber.onSubscribe thread-safe

When there are simultaneous invocations of onSubscribe, only the first one should succeed, the rest should cancel the provided subscriptions
master
Violeta Georgieva 8 years ago
parent 11ed847aca
commit 0605c0f3be
  1. 7
      spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodySubscriber.java

@ -18,6 +18,7 @@ package org.springframework.http.server.reactive;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.Channel; import java.nio.channels.Channel;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.WriteListener; import javax.servlet.WriteListener;
@ -169,11 +170,13 @@ abstract class AbstractResponseBodySubscriber implements Subscriber<DataBuffer>
@Override @Override
void onSubscribe(AbstractResponseBodySubscriber subscriber, void onSubscribe(AbstractResponseBodySubscriber subscriber,
Subscription subscription) { Subscription subscription) {
if (BackpressureUtils.validate(subscriber.subscription, subscription)) { Objects.requireNonNull(subscription, "Subscription cannot be null");
subscriber.subscription = subscription;
if (subscriber.changeState(this, REQUESTED)) { if (subscriber.changeState(this, REQUESTED)) {
subscriber.subscription = subscription;
subscription.request(1); subscription.request(1);
} }
else {
super.onSubscribe(subscriber, subscription);
} }
} }
}, },

Loading…
Cancel
Save