diff --git a/rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java b/rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java index 38c392408..09eeadb6c 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java @@ -93,7 +93,6 @@ public Context currentContext() { @Override public void request(long n) { - this.s.request(n); if (!firstRequest) { try { this.hookOnRemainingRequests(n); @@ -115,6 +114,15 @@ public void request(long n) { if (firstLoop) { firstLoop = false; try { + // since in all the scenarios where RequestOperator is used, the + // CorePublisher is either UnicastProcessor or UnicastProcessor.next() + // we are free to propagate unbounded demand to that publisher right after + // the first request happens. UnicastProcessor is only there to allow sending signals from + // the + // connection to a real subscriber and does not have to check the real demand + // For more info see + // https://github.com/rsocket/rsocket/blob/master/Protocol.md#handling-the-unexpected + this.s.request(Long.MAX_VALUE); this.hookOnFirstRequest(n); } catch (Throwable throwable) { onError(throwable); diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java index 45770d375..1ce68cfeb 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java @@ -1142,6 +1142,51 @@ public void testWorkaround858() { rule.assertHasNoLeaks(); } + @ParameterizedTest + @ValueSource(strings = {"stream", "channel"}) + // see https://github.com/rsocket/rsocket-java/issues/959 + public void testWorkaround959(String type) { + for (int i = 1; i < 20000; i += 2) { + ByteBuf buffer = rule.alloc().buffer(); + buffer.writeCharSequence("test", CharsetUtil.UTF_8); + + final AssertSubscriber assertSubscriber = new AssertSubscriber<>(3); + if (type.equals("stream")) { + rule.socket.requestStream(ByteBufPayload.create(buffer)).subscribe(assertSubscriber); + } else if (type.equals("channel")) { + rule.socket + .requestChannel(Flux.just(ByteBufPayload.create(buffer))) + .subscribe(assertSubscriber); + } + + final ByteBuf payloadFrame = + PayloadFrameCodec.encode( + rule.alloc(), i, false, false, true, Unpooled.EMPTY_BUFFER, Unpooled.EMPTY_BUFFER); + + RaceTestUtils.race( + () -> { + rule.connection.addToReceivedBuffer(payloadFrame.copy()); + rule.connection.addToReceivedBuffer(payloadFrame.copy()); + rule.connection.addToReceivedBuffer(payloadFrame); + }, + () -> { + assertSubscriber.request(1); + assertSubscriber.request(1); + assertSubscriber.request(1); + }); + + Assertions.assertThat(rule.connection.getSent()).allMatch(ByteBuf::release); + + Assertions.assertThat(rule.socket.isDisposed()).isFalse(); + + assertSubscriber.values().forEach(ReferenceCountUtil::safeRelease); + assertSubscriber.assertNoError(); + + rule.connection.clearSendReceiveBuffers(); + rule.assertHasNoLeaks(); + } + } + public static class ClientSocketRule extends AbstractSocketRule { @Override protected RSocketRequester newRSocket() {