From 6f8a30231ee8dee2093145b4117e6f9fe3e2e82d Mon Sep 17 00:00:00 2001 From: Allard Buijze Date: Sat, 15 Jan 2022 15:19:10 +0100 Subject: [PATCH] Removed overrides for block() methods The implementations are a near-copy of their subscribe() counterpart. Instead of duplicating logic which leads to errors like #1033, the implementation now relies on Mono's implementation of block(). Resolves #1033 Signed-off-by: Allard Buijze --- .../core/FireAndForgetRequesterMono.java | 99 ------------------- .../core/MetadataPushRequesterMono.java | 59 ----------- 2 files changed, 158 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/core/FireAndForgetRequesterMono.java b/rsocket-core/src/main/java/io/rsocket/core/FireAndForgetRequesterMono.java index eceb0976c..cb8654b55 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/FireAndForgetRequesterMono.java +++ b/rsocket-core/src/main/java/io/rsocket/core/FireAndForgetRequesterMono.java @@ -26,7 +26,6 @@ import io.rsocket.Payload; import io.rsocket.frame.FrameType; import io.rsocket.plugins.RequestInterceptor; -import java.time.Duration; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; @@ -179,104 +178,6 @@ public void cancel() { markTerminated(STATE, this); } - @Override - @Nullable - public Void block(Duration m) { - return block(); - } - - @Override - @Nullable - public Void block() { - long previousState = markSubscribed(STATE, this); - if (isSubscribedOrTerminated(previousState)) { - final IllegalStateException e = - new IllegalStateException("FireAndForgetMono allows only a single Subscriber"); - final RequestInterceptor requestInterceptor = this.requestInterceptor; - if (requestInterceptor != null) { - requestInterceptor.onReject(e, FrameType.REQUEST_FNF, null); - } - throw e; - } - - final Payload p = this.payload; - try { - if (!isValid(this.mtu, this.maxFrameLength, p, false)) { - lazyTerminate(STATE, this); - - final IllegalArgumentException e = - new IllegalArgumentException( - String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength)); - - final RequestInterceptor requestInterceptor = this.requestInterceptor; - if (requestInterceptor != null) { - requestInterceptor.onReject(e, FrameType.REQUEST_FNF, p.metadata()); - } - - p.release(); - - throw e; - } - } catch (IllegalReferenceCountException e) { - lazyTerminate(STATE, this); - - final RequestInterceptor requestInterceptor = this.requestInterceptor; - if (requestInterceptor != null) { - requestInterceptor.onReject(e, FrameType.REQUEST_FNF, null); - } - - throw Exceptions.propagate(e); - } - - final int streamId; - try { - streamId = this.requesterResponderSupport.getNextStreamId(); - } catch (Throwable t) { - lazyTerminate(STATE, this); - - final RequestInterceptor requestInterceptor = this.requestInterceptor; - if (requestInterceptor != null) { - requestInterceptor.onReject(Exceptions.unwrap(t), FrameType.REQUEST_FNF, p.metadata()); - } - - p.release(); - - throw Exceptions.propagate(t); - } - - final RequestInterceptor interceptor = this.requestInterceptor; - if (interceptor != null) { - interceptor.onStart(streamId, FrameType.REQUEST_FNF, p.metadata()); - } - - try { - sendReleasingPayload( - streamId, - FrameType.REQUEST_FNF, - this.mtu, - this.payload, - this.connection, - this.allocator, - true); - } catch (Throwable e) { - lazyTerminate(STATE, this); - - if (interceptor != null) { - interceptor.onTerminate(streamId, FrameType.REQUEST_FNF, e); - } - - throw Exceptions.propagate(e); - } - - lazyTerminate(STATE, this); - - if (interceptor != null) { - interceptor.onTerminate(streamId, FrameType.REQUEST_FNF, null); - } - - return null; - } - @Override public Object scanUnsafe(Scannable.Attr key) { return null; // no particular key to be represented, still useful in hooks diff --git a/rsocket-core/src/main/java/io/rsocket/core/MetadataPushRequesterMono.java b/rsocket-core/src/main/java/io/rsocket/core/MetadataPushRequesterMono.java index 226e9a0af..6f37b5485 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/MetadataPushRequesterMono.java +++ b/rsocket-core/src/main/java/io/rsocket/core/MetadataPushRequesterMono.java @@ -25,14 +25,12 @@ import io.rsocket.DuplexConnection; import io.rsocket.Payload; import io.rsocket.frame.MetadataPushFrameCodec; -import java.time.Duration; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import reactor.core.CoreSubscriber; import reactor.core.Scannable; import reactor.core.publisher.Mono; import reactor.core.publisher.Operators; import reactor.util.annotation.NonNull; -import reactor.util.annotation.Nullable; final class MetadataPushRequesterMono extends Mono implements Scannable { @@ -114,63 +112,6 @@ public void subscribe(CoreSubscriber actual) { Operators.complete(actual); } - @Override - @Nullable - public Void block(Duration m) { - return block(); - } - - @Override - @Nullable - public Void block() { - long previousState = markSubscribed(STATE, this); - if (isSubscribedOrTerminated(previousState)) { - throw new IllegalStateException("MetadataPushMono allows only a single Subscriber"); - } - - final Payload p = this.payload; - final ByteBuf metadata; - try { - final boolean hasMetadata = p.hasMetadata(); - metadata = p.metadata(); - if (hasMetadata) { - lazyTerminate(STATE, this); - p.release(); - throw new IllegalArgumentException("Metadata push does not support metadata field"); - } - if (!isValidMetadata(this.maxFrameLength, metadata)) { - lazyTerminate(STATE, this); - p.release(); - throw new IllegalArgumentException("Too Big Payload size"); - } - } catch (IllegalReferenceCountException e) { - lazyTerminate(STATE, this); - throw e; - } - - final ByteBuf metadataRetainedSlice; - try { - metadataRetainedSlice = metadata.retainedSlice(); - } catch (IllegalReferenceCountException e) { - lazyTerminate(STATE, this); - throw e; - } - - try { - p.release(); - } catch (IllegalReferenceCountException e) { - lazyTerminate(STATE, this); - metadataRetainedSlice.release(); - throw e; - } - - final ByteBuf requestFrame = - MetadataPushFrameCodec.encode(this.allocator, metadataRetainedSlice); - this.connection.sendFrame(0, requestFrame); - - return null; - } - @Override public Object scanUnsafe(Attr key) { return null; // no particular key to be represented, still useful in hooks