diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java index 7b67009e8..54f339c12 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java @@ -406,22 +406,20 @@ private void handleRequestResponse(int streamId, Mono response) { @Override protected void hookOnNext(Payload payload) { - if (isEmpty) { - isEmpty = false; - } + isEmpty = false; if (!PayloadValidationUtils.isValid(mtu, payload, maxFrameLength)) { payload.release(); cancel(); - final IllegalArgumentException t = - new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE); - handleError(streamId, t); + sendingSubscriptions.remove(streamId, this); + handleError(streamId, new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE)); return; } ByteBuf byteBuf = PayloadFrameCodec.encodeNextCompleteReleasingPayload(allocator, streamId, payload); sendProcessor.onNext(byteBuf); + sendingSubscriptions.remove(streamId, this); } @Override @@ -433,10 +431,8 @@ protected void hookOnError(Throwable throwable) { @Override protected void hookOnComplete() { - if (isEmpty) { - if (sendingSubscriptions.remove(streamId, this)) { - sendProcessor.onNext(PayloadFrameCodec.encodeComplete(allocator, streamId)); - } + if (isEmpty && sendingSubscriptions.remove(streamId, this)) { + sendProcessor.onNext(PayloadFrameCodec.encodeComplete(allocator, streamId)); } } };