diff --git a/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java b/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java index 0b23bcde5..e716b8fcb 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java +++ b/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java @@ -114,10 +114,10 @@ public Mono acceptRSocketSetup( final ServerRSocketSession serverRSocketSession = new ServerRSocketSession( resumeToken, - duplexConnection, resumableDuplexConnection, - resumeSessionDuration, + duplexConnection, resumableFramesStore, + resumeSessionDuration, cleanupStoreOnKeepAlive); sessionManager.save(serverRSocketSession, resumeToken); diff --git a/rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveSupport.java b/rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveSupport.java index 6a3ab40d3..4fd18d041 100644 --- a/rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveSupport.java +++ b/rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveSupport.java @@ -23,7 +23,7 @@ import io.rsocket.resume.ResumeStateHolder; import java.time.Duration; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.Consumer; import reactor.core.Disposable; import reactor.core.publisher.Flux; @@ -38,11 +38,19 @@ public abstract class KeepAliveSupport implements KeepAliveFramesAcceptor { final Duration keepAliveTimeout; final long keepAliveTimeoutMillis; - final AtomicBoolean started = new AtomicBoolean(); + volatile int state; + static final AtomicIntegerFieldUpdater STATE = + AtomicIntegerFieldUpdater.newUpdater(KeepAliveSupport.class, "state"); + + static final int STOPPED_STATE = 0; + static final int STARTING_STATE = 1; + static final int STARTED_STATE = 2; + static final int DISPOSED_STATE = -1; volatile Consumer onTimeout; volatile Consumer onFrameSent; - volatile Disposable ticksDisposable; + + Disposable ticksDisposable; volatile ResumeStateHolder resumeStateHolder; volatile long lastReceivedMillis; @@ -57,25 +65,30 @@ private KeepAliveSupport( } public KeepAliveSupport start() { - this.lastReceivedMillis = scheduler.now(TimeUnit.MILLISECONDS); - if (started.compareAndSet(false, true)) { - ticksDisposable = + if (this.state == STOPPED_STATE && STATE.compareAndSet(this, STOPPED_STATE, STARTING_STATE)) { + this.lastReceivedMillis = scheduler.now(TimeUnit.MILLISECONDS); + + final Disposable disposable = Flux.interval(keepAliveInterval, scheduler).subscribe(v -> onIntervalTick()); + this.ticksDisposable = disposable; + + if (this.state != STARTING_STATE + || !STATE.compareAndSet(this, STARTING_STATE, STARTED_STATE)) { + disposable.dispose(); + } } return this; } public void stop() { - if (started.compareAndSet(true, false)) { - ticksDisposable.dispose(); - } + terminate(STOPPED_STATE); } @Override public void receive(ByteBuf keepAliveFrame) { this.lastReceivedMillis = scheduler.now(TimeUnit.MILLISECONDS); if (resumeStateHolder != null) { - long remoteLastReceivedPos = remoteLastReceivedPosition(keepAliveFrame); + final long remoteLastReceivedPos = KeepAliveFrameCodec.lastPosition(keepAliveFrame); resumeStateHolder.onImpliedPosition(remoteLastReceivedPos); } if (KeepAliveFrameCodec.respondFlag(keepAliveFrame)) { @@ -104,6 +117,16 @@ public KeepAliveSupport onTimeout(Consumer onTimeout) { return this; } + @Override + public void dispose() { + terminate(DISPOSED_STATE); + } + + @Override + public boolean isDisposed() { + return ticksDisposable.isDisposed(); + } + abstract void onIntervalTick(); void send(ByteBuf frame) { @@ -122,22 +145,24 @@ void tryTimeout() { } } - long localLastReceivedPosition() { - return resumeStateHolder != null ? resumeStateHolder.impliedPosition() : 0; - } + void terminate(int terminationState) { + for (; ; ) { + final int state = this.state; - long remoteLastReceivedPosition(ByteBuf keepAliveFrame) { - return KeepAliveFrameCodec.lastPosition(keepAliveFrame); - } + if (state == STOPPED_STATE || state == DISPOSED_STATE) { + return; + } - @Override - public void dispose() { - stop(); + final Disposable disposable = this.ticksDisposable; + if (STATE.compareAndSet(this, state, terminationState)) { + disposable.dispose(); + return; + } + } } - @Override - public boolean isDisposed() { - return ticksDisposable.isDisposed(); + long localLastReceivedPosition() { + return resumeStateHolder != null ? resumeStateHolder.impliedPosition() : 0; } public static final class ClientKeepAliveSupport extends KeepAliveSupport { diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java b/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java index c58cc4954..8dcf67a0b 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java @@ -22,12 +22,14 @@ import io.rsocket.DuplexConnection; import io.rsocket.exceptions.ConnectionErrorException; import io.rsocket.exceptions.Exceptions; +import io.rsocket.exceptions.RejectedResumeException; import io.rsocket.frame.FrameHeaderCodec; import io.rsocket.frame.FrameType; import io.rsocket.frame.ResumeFrameCodec; import io.rsocket.frame.ResumeOkFrameCodec; import io.rsocket.keepalive.KeepAliveSupport; import java.time.Duration; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Function; import org.reactivestreams.Subscription; @@ -109,22 +111,21 @@ public ClientRSocketSession( resumableDuplexConnection.onClose().doFinally(__ -> dispose()).subscribe(); resumableDuplexConnection.onActiveConnectionClosed().subscribe(this::reconnect); - - S.lazySet(this, Operators.cancelledSubscription()); } void reconnect(int index) { - if (this.s == Operators.cancelledSubscription() - && S.compareAndSet(this, Operators.cancelledSubscription(), null)) { - keepAliveSupport.stop(); - if (logger.isDebugEnabled()) { - logger.debug( - "Side[client]|Session[{}]. Connection[{}] is lost. Reconnecting to resume...", - session, - index); - } - connectionFactory.retryWhen(retry).timeout(resumeSessionDuration).subscribe(this); + keepAliveSupport.stop(); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[client]|Session[{}]. Connection[{}] is lost. Reconnecting to resume...", + session, + index); } + connectionFactory + .doOnNext(this::tryReestablishSession) + .retryWhen(retry) + .timeout(resumeSessionDuration) + .subscribe(this); } @Override @@ -159,31 +160,10 @@ public boolean isDisposed() { return resumableConnection.isDisposed(); } - @Override - public void onSubscribe(Subscription s) { - if (Operators.setOnce(S, this, s)) { - s.request(Long.MAX_VALUE); - } - } - - @Override - public void onNext(Tuple2 tuple2) { + void tryReestablishSession(Tuple2 tuple2) { ByteBuf shouldBeResumeOKFrame = tuple2.getT1(); DuplexConnection nextDuplexConnection = tuple2.getT2(); - if (!Operators.terminate(S, this)) { - if (logger.isDebugEnabled()) { - logger.debug( - "Side[client]|Session[{}]. Session has already been expired. Terminating received connection", - session); - } - final ConnectionErrorException connectionErrorException = - new ConnectionErrorException("resumption_server=[Session Expired]"); - nextDuplexConnection.sendErrorAndClose(connectionErrorException); - nextDuplexConnection.receive().subscribe().dispose(); - return; - } - final int streamId = FrameHeaderCodec.streamId(shouldBeResumeOKFrame); if (streamId != 0) { if (logger.isDebugEnabled()) { @@ -196,7 +176,8 @@ public void onNext(Tuple2 tuple2) { resumableConnection.dispose(connectionErrorException); nextDuplexConnection.sendErrorAndClose(connectionErrorException); nextDuplexConnection.receive().subscribe().dispose(); - return; + + throw connectionErrorException; // throw to retry connection again } final FrameType frameType = FrameHeaderCodec.nativeFrameType(shouldBeResumeOKFrame); @@ -208,33 +189,56 @@ public void onNext(Tuple2 tuple2) { // observed final long position = resumableFramesStore.framePosition(); final long impliedPosition = resumableFramesStore.frameImpliedPosition(); - logger.debug( - "Side[client]|Session[{}]. ResumeOK FRAME received. ServerResumeState[remoteImpliedPosition[{}]]. ClientResumeState[impliedPosition[{}], position[{}]]", - session, - remoteImpliedPos, - impliedPosition, - position); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[client]|Session[{}]. ResumeOK FRAME received. ServerResumeState[remoteImpliedPosition[{}]]. ClientResumeState[impliedPosition[{}], position[{}]]", + session, + remoteImpliedPos, + impliedPosition, + position); + } if (position <= remoteImpliedPos) { try { if (position != remoteImpliedPos) { resumableFramesStore.releaseFrames(remoteImpliedPos); } } catch (IllegalStateException e) { - logger.debug("Exception occurred while releasing frames in the frameStore", e); - resumableConnection.dispose(e); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[client]|Session[{}]. Exception occurred while releasing frames in the frameStore", + session, + e); + } final ConnectionErrorException t = new ConnectionErrorException(e.getMessage(), e); + + resumableConnection.dispose(t); + nextDuplexConnection.sendErrorAndClose(t); nextDuplexConnection.receive().subscribe().dispose(); + return; } - if (resumableConnection.connect(nextDuplexConnection)) { - keepAliveSupport.start(); + if (!tryCancelSessionTimeout()) { if (logger.isDebugEnabled()) { logger.debug( - "Side[client]|Session[{}]. Session has been resumed successfully", session); + "Side[client]|Session[{}]. Session has already been expired. Terminating received connection", + session); } - } else { + final ConnectionErrorException connectionErrorException = + new ConnectionErrorException("resumption_server=[Session Expired]"); + nextDuplexConnection.sendErrorAndClose(connectionErrorException); + nextDuplexConnection.receive().subscribe().dispose(); + return; + } + + keepAliveSupport.start(); + + if (logger.isDebugEnabled()) { + logger.debug("Side[client]|Session[{}]. Session has been resumed successfully", session); + } + + if (!resumableConnection.connect(nextDuplexConnection)) { if (logger.isDebugEnabled()) { logger.debug( "Side[client]|Session[{}]. Session has already been expired. Terminating received connection", @@ -244,41 +248,96 @@ public void onNext(Tuple2 tuple2) { new ConnectionErrorException("resumption_server_pos=[Session Expired]"); nextDuplexConnection.sendErrorAndClose(connectionErrorException); nextDuplexConnection.receive().subscribe().dispose(); + // no need to do anything since connection resumable connection is liklly to + // be disposed } } else { - logger.debug( - "Side[client]|Session[{}]. Mismatching remote and local state. Expected RemoteImpliedPosition[{}] to be greater or equal to the LocalPosition[{}]. Terminating received connection", - session, - remoteImpliedPos, - position); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[client]|Session[{}]. Mismatching remote and local state. Expected RemoteImpliedPosition[{}] to be greater or equal to the LocalPosition[{}]. Terminating received connection", + session, + remoteImpliedPos, + position); + } final ConnectionErrorException connectionErrorException = new ConnectionErrorException("resumption_server_pos=[" + remoteImpliedPos + "]"); + resumableConnection.dispose(connectionErrorException); + nextDuplexConnection.sendErrorAndClose(connectionErrorException); nextDuplexConnection.receive().subscribe().dispose(); } } else if (frameType == FrameType.ERROR) { final RuntimeException exception = Exceptions.from(0, shouldBeResumeOKFrame); - logger.debug("Received error frame. Terminating received connection", exception); - resumableConnection.dispose(exception); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[client]|Session[{}]. Received error frame. Terminating received connection", + session, + exception); + } + if (exception instanceof RejectedResumeException) { + resumableConnection.dispose(exception); + nextDuplexConnection.dispose(); + nextDuplexConnection.receive().subscribe().dispose(); + return; + } + + nextDuplexConnection.dispose(); + throw exception; // assume retryable exception } else { - logger.debug( - "Illegal first frame received. RESUME_OK frame must be received before any others. Terminating received connection"); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[client]|Session[{}]. Illegal first frame received. RESUME_OK frame must be received before any others. Terminating received connection", + session); + } final ConnectionErrorException connectionErrorException = new ConnectionErrorException("RESUME_OK frame must be received before any others"); + resumableConnection.dispose(connectionErrorException); + nextDuplexConnection.sendErrorAndClose(connectionErrorException); nextDuplexConnection.receive().subscribe().dispose(); + + // no need to do anything since remote server rejected our connection completely + } + } + + boolean tryCancelSessionTimeout() { + for (; ; ) { + final Subscription subscription = this.s; + + if (subscription == Operators.cancelledSubscription()) { + return false; + } + + if (S.compareAndSet(this, subscription, null)) { + subscription.cancel(); + return true; + } } } + @Override + public void onSubscribe(Subscription s) { + if (Operators.setOnce(S, this, s)) { + s.request(Long.MAX_VALUE); + } + } + + @Override + public void onNext(Tuple2 objects) {} + @Override public void onError(Throwable t) { if (!Operators.terminate(S, this)) { Operators.onErrorDropped(t, currentContext()); } - resumableConnection.dispose(t); + if (t instanceof TimeoutException) { + resumableConnection.dispose(); + } else { + resumableConnection.dispose(t); + } } @Override diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java index 18cd7167a..933ac09ca 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java @@ -103,21 +103,18 @@ public boolean connect(DuplexConnection nextConnection) { } void initConnection(DuplexConnection nextConnection) { - if (logger.isDebugEnabled()) { - logger.debug( - "Side[{}]|Session[{}]. Connecting to DuplexConnection[{}]", - side, - session, - nextConnection); - } - - final int currentConnectionIndex = connectionIndex; + final int nextConnectionIndex = this.connectionIndex + 1; final FrameReceivingSubscriber frameReceivingSubscriber = new FrameReceivingSubscriber(side, resumableFramesStore, receiveSubscriber); - this.connectionIndex = currentConnectionIndex + 1; + this.connectionIndex = nextConnectionIndex; this.activeReceivingSubscriber = frameReceivingSubscriber; + if (logger.isDebugEnabled()) { + logger.debug( + "Side[{}]|Session[{}]|DuplexConnection[{}]. Connecting", side, session, connectionIndex); + } + final Disposable resumeStreamSubscription = resumableFramesStore .resumeStream() @@ -136,17 +133,18 @@ void initConnection(DuplexConnection nextConnection) { resumeStreamSubscription.dispose(); if (logger.isDebugEnabled()) { logger.debug( - "Side[{}]|Session[{}]. Disconnected from DuplexConnection[{}]", + "Side[{}]|Session[{}]|DuplexConnection[{}]. Disconnected", side, session, - nextConnection); + connectionIndex); } - Sinks.EmitResult result = onConnectionClosedSink.tryEmitNext(currentConnectionIndex); + Sinks.EmitResult result = onConnectionClosedSink.tryEmitNext(nextConnectionIndex); if (!result.equals(Sinks.EmitResult.OK)) { logger.error( - "Side[{}]|Session[{}]. Failed to notify session of closed connection: {}", + "Side[{}]|Session[{}]|DuplexConnection[{}]. Failed to notify session of closed connection: {}", side, session, + connectionIndex, result); } }) @@ -193,14 +191,18 @@ public void sendErrorAndClose(RSocketErrorException rSocketErrorException) { null, t -> { framesSaverDisposable.dispose(); + activeReceivingSubscriber.dispose(); savableFramesSender.dispose(); onConnectionClosedSink.tryEmitComplete(); + onClose.tryEmitError(t); }, () -> { framesSaverDisposable.dispose(); + activeReceivingSubscriber.dispose(); savableFramesSender.dispose(); onConnectionClosedSink.tryEmitComplete(); + final Throwable cause = rSocketErrorException.getCause(); if (cause == null) { onClose.tryEmitEmpty(); @@ -242,7 +244,11 @@ void dispose(@Nullable Throwable e) { } if (logger.isDebugEnabled()) { - logger.debug("Side[{}]|Session[{}]. Disposing...", side, session); + logger.debug( + "Side[{}]|Session[{}]|DuplexConnection[{}]. Disposing...", + side, + session, + connectionIndex); } framesSaverDisposable.dispose(); diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java b/rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java index a57899cac..83c5bf8c1 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java @@ -44,7 +44,7 @@ public class ServerRSocketSession final ResumableDuplexConnection resumableConnection; final Duration resumeSessionDuration; final ResumableFramesStore resumableFramesStore; - final String resumeToken; + final String session; final ByteBufAllocator allocator; final boolean cleanupStoreOnKeepAlive; @@ -66,13 +66,13 @@ public class ServerRSocketSession KeepAliveSupport keepAliveSupport; public ServerRSocketSession( - ByteBuf resumeToken, - DuplexConnection initialDuplexConnection, + ByteBuf session, ResumableDuplexConnection resumableDuplexConnection, - Duration resumeSessionDuration, + DuplexConnection initialDuplexConnection, ResumableFramesStore resumableFramesStore, + Duration resumeSessionDuration, boolean cleanupStoreOnKeepAlive) { - this.resumeToken = resumeToken.toString(CharsetUtil.UTF_8); + this.session = session.toString(CharsetUtil.UTF_8); this.allocator = initialDuplexConnection.alloc(); this.resumeSessionDuration = resumeSessionDuration; this.resumableFramesStore = resumableFramesStore; @@ -88,8 +88,14 @@ public ServerRSocketSession( void tryTimeoutSession() { keepAliveSupport.stop(); + + if (logger.isDebugEnabled()) { + logger.debug( + "Side[server]|Session[{}]. Connection is lost. Trying to timeout the active session", + session); + } + Mono.delay(resumeSessionDuration).subscribe(this); - logger.debug("Connection is lost. Trying to timeout the active session[{}]", resumeToken); if (WIP.decrementAndGet(this) == 0) { return; @@ -103,6 +109,10 @@ void tryTimeoutSession() { public void resumeWith(ByteBuf resumeFrame, DuplexConnection nextDuplexConnection) { + if (logger.isDebugEnabled()) { + logger.debug("Side[server]|Session[{}]. New DuplexConnection received.", session); + } + long remotePos = ResumeFrameCodec.firstAvailableClientPos(resumeFrame); long remoteImpliedPos = ResumeFrameCodec.lastReceivedServerPos(resumeFrame); @@ -119,36 +129,30 @@ public void resumeWith(ByteBuf resumeFrame, DuplexConnection nextDuplexConnectio } void doResume(long remotePos, long remoteImpliedPos, DuplexConnection nextDuplexConnection) { + if (!tryCancelSessionTimeout()) { + if (logger.isDebugEnabled()) { + logger.debug( + "Side[server]|Session[{}]. Session has already been expired. Terminating received connection", + session); + } + final RejectedResumeException rejectedResumeException = + new RejectedResumeException("resume_internal_error: Session Expired"); + nextDuplexConnection.sendErrorAndClose(rejectedResumeException); + nextDuplexConnection.receive().subscribe().dispose(); + return; + } long impliedPosition = resumableFramesStore.frameImpliedPosition(); long position = resumableFramesStore.framePosition(); if (logger.isDebugEnabled()) { logger.debug( - "Side[server]|Session[{}]. Resume FRAME received. ClientResumeState{observedFramesPosition[{}], sentFramesPosition[{}]}, ServerResumeState{observedFramesPosition[{}], sentFramesPosition[{}]}", - resumeToken, - remoteImpliedPos, - remotePos, + "Side[server]|Session[{}]. Resume FRAME received. ServerResumeState[impliedPosition[{}], position[{}]]. ClientResumeState[remoteImpliedPosition[{}], remotePosition[{}]]", + session, impliedPosition, - position); - } - - for (; ; ) { - final Subscription subscription = this.s; - - if (subscription == Operators.cancelledSubscription()) { - logger.debug("Session has already been expired. Terminating received connection"); - final RejectedResumeException rejectedResumeException = - new RejectedResumeException("resume_internal_error: Session Expired"); - nextDuplexConnection.sendErrorAndClose(rejectedResumeException); - nextDuplexConnection.receive().subscribe().dispose(); - return; - } - - if (S.compareAndSet(this, subscription, null)) { - subscription.cancel(); - break; - } + position, + remoteImpliedPos, + remotePos); } if (remotePos <= impliedPosition && position <= remoteImpliedPos) { @@ -160,44 +164,60 @@ void doResume(long remotePos, long remoteImpliedPos, DuplexConnection nextDuplex if (logger.isDebugEnabled()) { logger.debug( "Side[server]|Session[{}]. ResumeOKFrame[impliedPosition[{}]] has been sent", - resumeToken, + session, impliedPosition); } } catch (Throwable t) { - logger.debug("Exception occurred while releasing frames in the frameStore", t); - tryTimeoutSession(); - nextDuplexConnection.sendErrorAndClose(new RejectedResumeException(t.getMessage(), t)); - nextDuplexConnection.receive().subscribe().dispose(); - return; - } - if (resumableConnection.connect(nextDuplexConnection)) { - keepAliveSupport.start(); if (logger.isDebugEnabled()) { logger.debug( - "Side[server]|Session[{}]. Session has been resumed successfully", resumeToken); + "Side[server]|Session[{}]. Exception occurred while releasing frames in the frameStore", + session, + t); } - } else { + + dispose(); + + final RejectedResumeException rejectedResumeException = + new RejectedResumeException(t.getMessage(), t); + nextDuplexConnection.sendErrorAndClose(rejectedResumeException); + nextDuplexConnection.receive().subscribe().dispose(); + + return; + } + + keepAliveSupport.start(); + + if (logger.isDebugEnabled()) { + logger.debug("Side[server]|Session[{}]. Session has been resumed successfully", session); + } + + if (!resumableConnection.connect(nextDuplexConnection)) { if (logger.isDebugEnabled()) { logger.debug( "Side[server]|Session[{}]. Session has already been expired. Terminating received connection", - resumeToken); + session); } - final ConnectionErrorException connectionErrorException = - new ConnectionErrorException("resume_internal_error: Session Expired"); - nextDuplexConnection.sendErrorAndClose(connectionErrorException); + final RejectedResumeException rejectedResumeException = + new RejectedResumeException("resume_internal_error: Session Expired"); + nextDuplexConnection.sendErrorAndClose(rejectedResumeException); nextDuplexConnection.receive().subscribe().dispose(); + + // resumableConnection is likely to be disposed at this stage. Thus we have + // nothing to do } } else { if (logger.isDebugEnabled()) { logger.debug( "Side[server]|Session[{}]. Mismatching remote and local state. Expected RemoteImpliedPosition[{}] to be greater or equal to the LocalPosition[{}] and RemotePosition[{}] to be less or equal to LocalImpliedPosition[{}]. Terminating received connection", - resumeToken, + session, remoteImpliedPos, position, remotePos, impliedPosition); } - tryTimeoutSession(); + + dispose(); + final RejectedResumeException rejectedResumeException = new RejectedResumeException( String.format( @@ -208,6 +228,21 @@ void doResume(long remotePos, long remoteImpliedPos, DuplexConnection nextDuplex } } + boolean tryCancelSessionTimeout() { + for (; ; ) { + final Subscription subscription = this.s; + + if (subscription == Operators.cancelledSubscription()) { + return false; + } + + if (S.compareAndSet(this, subscription, null)) { + subscription.cancel(); + return true; + } + } + } + @Override public long impliedPosition() { return resumableFramesStore.frameImpliedPosition(); @@ -216,7 +251,11 @@ public long impliedPosition() { @Override public void onImpliedPosition(long remoteImpliedPos) { if (cleanupStoreOnKeepAlive) { - resumableFramesStore.releaseFrames(remoteImpliedPos); + try { + resumableFramesStore.releaseFrames(remoteImpliedPos); + } catch (Throwable e) { + resumableConnection.sendErrorAndClose(new ConnectionErrorException(e.getMessage(), e)); + } } } diff --git a/rsocket-core/src/test/java/io/rsocket/resume/ClientRSocketSessionTest.java b/rsocket-core/src/test/java/io/rsocket/resume/ClientRSocketSessionTest.java new file mode 100644 index 000000000..34d8a7345 --- /dev/null +++ b/rsocket-core/src/test/java/io/rsocket/resume/ClientRSocketSessionTest.java @@ -0,0 +1,446 @@ +package io.rsocket.resume; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCounted; +import io.rsocket.FrameAssert; +import io.rsocket.exceptions.ConnectionCloseException; +import io.rsocket.exceptions.RejectedResumeException; +import io.rsocket.frame.ErrorFrameCodec; +import io.rsocket.frame.FrameType; +import io.rsocket.frame.KeepAliveFrameCodec; +import io.rsocket.frame.ResumeOkFrameCodec; +import io.rsocket.keepalive.KeepAliveSupport; +import io.rsocket.test.util.TestClientTransport; +import io.rsocket.test.util.TestDuplexConnection; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Operators; +import reactor.test.StepVerifier; +import reactor.test.scheduler.VirtualTimeScheduler; +import reactor.util.function.Tuples; +import reactor.util.retry.Retry; + +public class ClientRSocketSessionTest { + + @Test + void sessionTimeoutSmokeTest() { + final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.getOrSet(); + final TestClientTransport transport = new TestClientTransport(); + final InMemoryResumableFramesStore framesStore = + new InMemoryResumableFramesStore("test", Unpooled.EMPTY_BUFFER, 100); + + transport.connect().subscribe(); + + final ResumableDuplexConnection resumableDuplexConnection = + new ResumableDuplexConnection( + "test", Unpooled.EMPTY_BUFFER, transport.testConnection(), framesStore); + + resumableDuplexConnection.receive().subscribe(); + + final ClientRSocketSession session = + new ClientRSocketSession( + Unpooled.EMPTY_BUFFER, + resumableDuplexConnection, + transport.connect().delaySubscription(Duration.ofMillis(1)), + c -> { + AtomicBoolean firstHandled = new AtomicBoolean(); + return ((TestDuplexConnection) c) + .receive() + .next() + .doOnNext(__ -> firstHandled.set(true)) + .doOnCancel( + () -> { + if (firstHandled.compareAndSet(false, true)) { + c.dispose(); + } + }) + .map(b -> Tuples.of(b, c)); + }, + framesStore, + Duration.ofMinutes(1), + Retry.indefinitely(), + true); + + final KeepAliveSupport.ClientKeepAliveSupport keepAliveSupport = + new KeepAliveSupport.ClientKeepAliveSupport(transport.alloc(), 1000000, 10000000); + session.setKeepAliveSupport(keepAliveSupport); + + // connection is active. just advance time + virtualTimeScheduler.advanceTimeBy(Duration.ofSeconds(10)); + assertThat(session.s).isNull(); + assertThat(session.isDisposed()).isFalse(); + + // deactivate connection + transport.testConnection().dispose(); + assertThat(transport.testConnection().isDisposed()).isTrue(); + // ensures timeout has been started + assertThat(session.s).isNotNull(); + assertThat(session.isDisposed()).isFalse(); + + // advance time so new connection is received + virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(1)); + assertThat(transport.testConnection().isDisposed()).isFalse(); + // timeout should be still active since no RESUME_Ok frame has been received yet + assertThat(session.s).isNotNull(); + assertThat(session.isDisposed()).isFalse(); + + // advance time + virtualTimeScheduler.advanceTimeBy(Duration.ofSeconds(50)); + // timeout should not terminate current connection + assertThat(transport.testConnection().isDisposed()).isFalse(); + + // send RESUME_OK frame + transport.testConnection().addToReceivedBuffer(ResumeOkFrameCodec.encode(transport.alloc(), 0)); + assertThat(transport.testConnection().isDisposed()).isFalse(); + // timeout should be terminated + assertThat(session.s).isNull(); + assertThat(session.isDisposed()).isFalse(); + FrameAssert.assertThat(transport.testConnection().pollFrame()) + .hasStreamIdZero() + .typeOf(FrameType.RESUME) + .matches(ReferenceCounted::release); + + virtualTimeScheduler.advanceTimeBy(Duration.ofSeconds(15)); + + // disconnects for the second time + transport.testConnection().dispose(); + assertThat(transport.testConnection().isDisposed()).isTrue(); + // ensures timeout has been started + assertThat(session.s).isNotNull(); + assertThat(session.isDisposed()).isFalse(); + + // advance time so new connection is received + virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(1)); + assertThat(transport.testConnection().isDisposed()).isFalse(); + // timeout should be still active since no RESUME_Ok frame has been received yet + assertThat(session.s).isNotNull(); + assertThat(session.isDisposed()).isFalse(); + FrameAssert.assertThat(transport.testConnection().pollFrame()) + .hasStreamIdZero() + .typeOf(FrameType.RESUME) + .matches(ReferenceCounted::release); + + transport + .testConnection() + .addToReceivedBuffer( + ErrorFrameCodec.encode( + transport.alloc(), 0, new ConnectionCloseException("some message"))); + // connection should be closed because of the wrong first frame + assertThat(transport.testConnection().isDisposed()).isTrue(); + // ensures timeout is still in progress + assertThat(session.s).isNotNull(); + assertThat(session.isDisposed()).isFalse(); + + // advance time + virtualTimeScheduler.advanceTimeBy(Duration.ofSeconds(30)); + // should obtain new connection + assertThat(transport.testConnection().isDisposed()).isFalse(); + // timeout should be still active since no RESUME_OK frame has been received yet + assertThat(session.s).isNotNull(); + assertThat(session.isDisposed()).isFalse(); + + FrameAssert.assertThat(transport.testConnection().pollFrame()) + .hasStreamIdZero() + .typeOf(FrameType.RESUME) + .matches(ReferenceCounted::release); + + virtualTimeScheduler.advanceTimeBy(Duration.ofSeconds(30)); + + assertThat(session.s).isEqualTo(Operators.cancelledSubscription()); + assertThat(transport.testConnection().isDisposed()).isTrue(); + + assertThat(session.isDisposed()).isTrue(); + + resumableDuplexConnection.onClose().as(StepVerifier::create).expectComplete().verify(); + transport.alloc().assertHasNoLeaks(); + } + + @Test + void sessionTerminationOnWrongFrameTest() { + final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.getOrSet(); + final TestClientTransport transport = new TestClientTransport(); + final InMemoryResumableFramesStore framesStore = + new InMemoryResumableFramesStore("test", Unpooled.EMPTY_BUFFER, 100); + + transport.connect().subscribe(); + + final ResumableDuplexConnection resumableDuplexConnection = + new ResumableDuplexConnection( + "test", Unpooled.EMPTY_BUFFER, transport.testConnection(), framesStore); + + resumableDuplexConnection.receive().subscribe(); + + final ClientRSocketSession session = + new ClientRSocketSession( + Unpooled.EMPTY_BUFFER, + resumableDuplexConnection, + transport.connect().delaySubscription(Duration.ofMillis(1)), + c -> { + AtomicBoolean firstHandled = new AtomicBoolean(); + return ((TestDuplexConnection) c) + .receive() + .next() + .doOnNext(__ -> firstHandled.set(true)) + .doOnCancel( + () -> { + if (firstHandled.compareAndSet(false, true)) { + c.dispose(); + } + }) + .map(b -> Tuples.of(b, c)); + }, + framesStore, + Duration.ofMinutes(1), + Retry.indefinitely(), + true); + + final KeepAliveSupport.ClientKeepAliveSupport keepAliveSupport = + new KeepAliveSupport.ClientKeepAliveSupport(transport.alloc(), 1000000, 10000000); + session.setKeepAliveSupport(keepAliveSupport); + + // connection is active. just advance time + virtualTimeScheduler.advanceTimeBy(Duration.ofSeconds(10)); + assertThat(session.s).isNull(); + assertThat(session.isDisposed()).isFalse(); + + // deactivate connection + transport.testConnection().dispose(); + assertThat(transport.testConnection().isDisposed()).isTrue(); + // ensures timeout has been started + assertThat(session.s).isNotNull(); + assertThat(session.isDisposed()).isFalse(); + + // advance time so new connection is received + virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(1)); + assertThat(transport.testConnection().isDisposed()).isFalse(); + // timeout should be still active since no RESUME_Ok frame has been received yet + assertThat(session.s).isNotNull(); + assertThat(session.isDisposed()).isFalse(); + + FrameAssert.assertThat(transport.testConnection().pollFrame()) + .hasStreamIdZero() + .typeOf(FrameType.RESUME) + .matches(ReferenceCounted::release); + + // advance time + virtualTimeScheduler.advanceTimeBy(Duration.ofSeconds(50)); + // timeout should not terminate current connection + assertThat(transport.testConnection().isDisposed()).isFalse(); + + // send RESUME_OK frame + transport.testConnection().addToReceivedBuffer(ResumeOkFrameCodec.encode(transport.alloc(), 0)); + assertThat(transport.testConnection().isDisposed()).isFalse(); + // timeout should be terminated + assertThat(session.s).isNull(); + assertThat(session.isDisposed()).isFalse(); + + virtualTimeScheduler.advanceTimeBy(Duration.ofSeconds(15)); + + // disconnects for the second time + transport.testConnection().dispose(); + assertThat(transport.testConnection().isDisposed()).isTrue(); + // ensures timeout has been started + assertThat(session.s).isNotNull(); + assertThat(session.isDisposed()).isFalse(); + + // advance time so new connection is received + virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(1)); + assertThat(transport.testConnection().isDisposed()).isFalse(); + // timeout should be still active since no RESUME_Ok frame has been received yet + assertThat(session.s).isNotNull(); + assertThat(session.isDisposed()).isFalse(); + + FrameAssert.assertThat(transport.testConnection().pollFrame()) + .hasStreamIdZero() + .typeOf(FrameType.RESUME) + .matches(ReferenceCounted::release); + + // Send KEEPALIVE frame as a first frame + transport + .testConnection() + .addToReceivedBuffer( + KeepAliveFrameCodec.encode(transport.alloc(), false, 0, Unpooled.EMPTY_BUFFER)); + + virtualTimeScheduler.advanceTimeBy(Duration.ofSeconds(30)); + + assertThat(session.s).isEqualTo(Operators.cancelledSubscription()); + assertThat(transport.testConnection().isDisposed()).isTrue(); + assertThat(session.isDisposed()).isTrue(); + + FrameAssert.assertThat(transport.testConnection().pollFrame()) + .hasStreamIdZero() + .typeOf(FrameType.ERROR) + .matches(ReferenceCounted::release); + + resumableDuplexConnection + .onClose() + .as(StepVerifier::create) + .expectErrorMessage("RESUME_OK frame must be received before any others") + .verify(); + transport.alloc().assertHasNoLeaks(); + } + + @Test + void shouldErrorWithNoRetriesOnErrorFrameTest() { + final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.getOrSet(); + final TestClientTransport transport = new TestClientTransport(); + final InMemoryResumableFramesStore framesStore = + new InMemoryResumableFramesStore("test", Unpooled.EMPTY_BUFFER, 100); + + transport.connect().subscribe(); + + final ResumableDuplexConnection resumableDuplexConnection = + new ResumableDuplexConnection( + "test", Unpooled.EMPTY_BUFFER, transport.testConnection(), framesStore); + + resumableDuplexConnection.receive().subscribe(); + + final ClientRSocketSession session = + new ClientRSocketSession( + Unpooled.EMPTY_BUFFER, + resumableDuplexConnection, + transport.connect().delaySubscription(Duration.ofMillis(1)), + c -> { + AtomicBoolean firstHandled = new AtomicBoolean(); + return ((TestDuplexConnection) c) + .receive() + .next() + .doOnNext(__ -> firstHandled.set(true)) + .doOnCancel( + () -> { + if (firstHandled.compareAndSet(false, true)) { + c.dispose(); + } + }) + .map(b -> Tuples.of(b, c)); + }, + framesStore, + Duration.ofMinutes(1), + Retry.indefinitely(), + true); + + final KeepAliveSupport.ClientKeepAliveSupport keepAliveSupport = + new KeepAliveSupport.ClientKeepAliveSupport(transport.alloc(), 1000000, 10000000); + session.setKeepAliveSupport(keepAliveSupport); + + // connection is active. just advance time + virtualTimeScheduler.advanceTimeBy(Duration.ofSeconds(10)); + assertThat(session.s).isNull(); + assertThat(session.isDisposed()).isFalse(); + + // deactivate connection + transport.testConnection().dispose(); + assertThat(transport.testConnection().isDisposed()).isTrue(); + // ensures timeout has been started + assertThat(session.s).isNotNull(); + assertThat(session.isDisposed()).isFalse(); + + // advance time so new connection is received + virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(1)); + assertThat(transport.testConnection().isDisposed()).isFalse(); + // timeout should be still active since no RESUME_Ok frame has been received yet + assertThat(session.s).isNotNull(); + assertThat(session.isDisposed()).isFalse(); + + FrameAssert.assertThat(transport.testConnection().pollFrame()) + .hasStreamIdZero() + .typeOf(FrameType.RESUME) + .matches(ReferenceCounted::release); + + // advance time + virtualTimeScheduler.advanceTimeBy(Duration.ofSeconds(50)); + // timeout should not terminate current connection + assertThat(transport.testConnection().isDisposed()).isFalse(); + + // send REJECTED_RESUME_ERROR frame + transport + .testConnection() + .addToReceivedBuffer( + ErrorFrameCodec.encode( + transport.alloc(), 0, new RejectedResumeException("failed resumption"))); + assertThat(transport.testConnection().isDisposed()).isTrue(); + // timeout should be terminated + assertThat(session.s).isNotNull(); + assertThat(session.isDisposed()).isTrue(); + + resumableDuplexConnection + .onClose() + .as(StepVerifier::create) + .expectError(RejectedResumeException.class) + .verify(); + transport.alloc().assertHasNoLeaks(); + } + + @Test + void shouldTerminateConnectionOnIllegalStateInKeepAliveFrame() { + final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.getOrSet(); + final TestClientTransport transport = new TestClientTransport(); + final InMemoryResumableFramesStore framesStore = + new InMemoryResumableFramesStore("test", Unpooled.EMPTY_BUFFER, 100); + + transport.connect().subscribe(); + + final ResumableDuplexConnection resumableDuplexConnection = + new ResumableDuplexConnection( + "test", Unpooled.EMPTY_BUFFER, transport.testConnection(), framesStore); + + resumableDuplexConnection.receive().subscribe(); + + final ClientRSocketSession session = + new ClientRSocketSession( + Unpooled.EMPTY_BUFFER, + resumableDuplexConnection, + transport.connect().delaySubscription(Duration.ofMillis(1)), + c -> { + AtomicBoolean firstHandled = new AtomicBoolean(); + return ((TestDuplexConnection) c) + .receive() + .next() + .doOnNext(__ -> firstHandled.set(true)) + .doOnCancel( + () -> { + if (firstHandled.compareAndSet(false, true)) { + c.dispose(); + } + }) + .map(b -> Tuples.of(b, c)); + }, + framesStore, + Duration.ofMinutes(1), + Retry.indefinitely(), + true); + + final KeepAliveSupport.ClientKeepAliveSupport keepAliveSupport = + new KeepAliveSupport.ClientKeepAliveSupport(transport.alloc(), 1000000, 10000000); + keepAliveSupport.resumeState(session); + session.setKeepAliveSupport(keepAliveSupport); + + // connection is active. just advance time + virtualTimeScheduler.advanceTimeBy(Duration.ofSeconds(10)); + assertThat(session.s).isNull(); + assertThat(session.isDisposed()).isFalse(); + + final ByteBuf keepAliveFrame = + KeepAliveFrameCodec.encode(transport.alloc(), false, 1529, Unpooled.EMPTY_BUFFER); + keepAliveSupport.receive(keepAliveFrame); + keepAliveFrame.release(); + + assertThat(transport.testConnection().isDisposed()).isTrue(); + // timeout should be terminated + assertThat(session.s).isNotNull(); + assertThat(session.isDisposed()).isTrue(); + + FrameAssert.assertThat(transport.testConnection().pollFrame()) + .hasStreamIdZero() + .typeOf(FrameType.ERROR) + .matches(ReferenceCounted::release); + + resumableDuplexConnection.onClose().as(StepVerifier::create).expectComplete().verify(); + + transport.alloc().assertHasNoLeaks(); + } +} diff --git a/rsocket-core/src/test/java/io/rsocket/resume/ResumeCalculatorTest.java b/rsocket-core/src/test/java/io/rsocket/resume/ResumeCalculatorTest.java deleted file mode 100644 index d15abd189..000000000 --- a/rsocket-core/src/test/java/io/rsocket/resume/ResumeCalculatorTest.java +++ /dev/null @@ -1,57 +0,0 @@ -/// * -// * Copyright 2015-2019 the original author or authors. -// * -// * Licensed under the Apache License, Version 2.0 (the "License"); -// * you may not use this file except in compliance with the License. -// * You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -// package io.rsocket.resume; -// -// import org.junit.jupiter.api.Assertions; -// import org.junit.jupiter.api.BeforeEach; -// import org.junit.jupiter.api.Test; -// -// public class ResumeCalculatorTest { -// -// @BeforeEach -// void setUp() {} -// -// @Test -// void clientResumeSuccess() { -// long position = ResumableDuplexConnection.calculateRemoteImpliedPos(1, 42, -1, 3); -// Assertions.assertEquals(3, position); -// } -// -// @Test -// void clientResumeError() { -// long position = ResumableDuplexConnection.calculateRemoteImpliedPos(4, 42, -1, 3); -// Assertions.assertEquals(-1, position); -// } -// -// @Test -// void serverResumeSuccess() { -// long position = ResumableDuplexConnection.calculateRemoteImpliedPos(1, 42, 4, 23); -// Assertions.assertEquals(23, position); -// } -// -// @Test -// void serverResumeErrorClientState() { -// long position = ResumableDuplexConnection.calculateRemoteImpliedPos(1, 3, 4, 23); -// Assertions.assertEquals(-1, position); -// } -// -// @Test -// void serverResumeErrorServerState() { -// long position = ResumableDuplexConnection.calculateRemoteImpliedPos(4, 42, 4, 1); -// Assertions.assertEquals(-1, position); -// } -// } diff --git a/rsocket-core/src/test/java/io/rsocket/resume/ServerRSocketSessionTest.java b/rsocket-core/src/test/java/io/rsocket/resume/ServerRSocketSessionTest.java new file mode 100644 index 000000000..a3a682d94 --- /dev/null +++ b/rsocket-core/src/test/java/io/rsocket/resume/ServerRSocketSessionTest.java @@ -0,0 +1,182 @@ +package io.rsocket.resume; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCounted; +import io.rsocket.FrameAssert; +import io.rsocket.frame.FrameType; +import io.rsocket.frame.KeepAliveFrameCodec; +import io.rsocket.frame.ResumeFrameCodec; +import io.rsocket.keepalive.KeepAliveSupport; +import io.rsocket.test.util.TestClientTransport; +import java.time.Duration; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Operators; +import reactor.test.StepVerifier; +import reactor.test.scheduler.VirtualTimeScheduler; + +public class ServerRSocketSessionTest { + + @Test + void sessionTimeoutSmokeTest() { + final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.getOrSet(); + final TestClientTransport transport = new TestClientTransport(); + final InMemoryResumableFramesStore framesStore = + new InMemoryResumableFramesStore("test", Unpooled.EMPTY_BUFFER, 100); + + transport.connect().subscribe(); + + final ResumableDuplexConnection resumableDuplexConnection = + new ResumableDuplexConnection( + "test", Unpooled.EMPTY_BUFFER, transport.testConnection(), framesStore); + + resumableDuplexConnection.receive().subscribe(); + + final ServerRSocketSession session = + new ServerRSocketSession( + Unpooled.EMPTY_BUFFER, + resumableDuplexConnection, + transport.testConnection(), + framesStore, + Duration.ofMinutes(1), + true); + + final KeepAliveSupport.ClientKeepAliveSupport keepAliveSupport = + new KeepAliveSupport.ClientKeepAliveSupport(transport.alloc(), 1000000, 10000000); + session.setKeepAliveSupport(keepAliveSupport); + + // connection is active. just advance time + virtualTimeScheduler.advanceTimeBy(Duration.ofSeconds(10)); + assertThat(session.s).isNull(); + assertThat(session.isDisposed()).isFalse(); + + // deactivate connection + transport.testConnection().dispose(); + assertThat(transport.testConnection().isDisposed()).isTrue(); + // ensures timeout has been started + assertThat(session.s).isNotNull(); + assertThat(session.isDisposed()).isFalse(); + + // resubscribe so a new connection is generated + transport.connect().subscribe(); + + assertThat(transport.testConnection().isDisposed()).isFalse(); + // timeout should be still active since no RESUME_Ok frame has been received yet + assertThat(session.s).isNotNull(); + assertThat(session.isDisposed()).isFalse(); + + // advance time + virtualTimeScheduler.advanceTimeBy(Duration.ofSeconds(50)); + // timeout should not terminate current connection + assertThat(transport.testConnection().isDisposed()).isFalse(); + + // send RESUME frame + final ByteBuf resumeFrame = + ResumeFrameCodec.encode(transport.alloc(), Unpooled.EMPTY_BUFFER, 0, 0); + session.resumeWith(resumeFrame, transport.testConnection()); + resumeFrame.release(); + + assertThat(transport.testConnection().isDisposed()).isFalse(); + // timeout should be terminated + assertThat(session.s).isNull(); + assertThat(session.isDisposed()).isFalse(); + FrameAssert.assertThat(transport.testConnection().pollFrame()) + .hasStreamIdZero() + .typeOf(FrameType.RESUME_OK) + .matches(ReferenceCounted::release); + + virtualTimeScheduler.advanceTimeBy(Duration.ofSeconds(15)); + + // disconnects for the second time + transport.testConnection().dispose(); + assertThat(transport.testConnection().isDisposed()).isTrue(); + // ensures timeout has been started + assertThat(session.s).isNotNull(); + assertThat(session.isDisposed()).isFalse(); + + transport.connect().subscribe(); + + assertThat(transport.testConnection().isDisposed()).isFalse(); + // timeout should be still active since no RESUME_Ok frame has been received yet + assertThat(session.s).isNotNull(); + assertThat(session.isDisposed()).isFalse(); + + // advance time + virtualTimeScheduler.advanceTimeBy(Duration.ofSeconds(61)); + + final ByteBuf resumeFrame1 = + ResumeFrameCodec.encode(transport.alloc(), Unpooled.EMPTY_BUFFER, 0, 0); + session.resumeWith(resumeFrame1, transport.testConnection()); + resumeFrame1.release(); + + // should obtain new connection + assertThat(transport.testConnection().isDisposed()).isTrue(); + // timeout should be still active since no RESUME_OK frame has been received yet + assertThat(session.s).isEqualTo(Operators.cancelledSubscription()); + assertThat(session.isDisposed()).isTrue(); + + FrameAssert.assertThat(transport.testConnection().pollFrame()) + .hasStreamIdZero() + .typeOf(FrameType.ERROR) + .matches(ReferenceCounted::release); + + resumableDuplexConnection.onClose().as(StepVerifier::create).expectComplete().verify(); + transport.alloc().assertHasNoLeaks(); + } + + @Test + void shouldTerminateConnectionOnIllegalStateInKeepAliveFrame() { + final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.getOrSet(); + final TestClientTransport transport = new TestClientTransport(); + final InMemoryResumableFramesStore framesStore = + new InMemoryResumableFramesStore("test", Unpooled.EMPTY_BUFFER, 100); + + transport.connect().subscribe(); + + final ResumableDuplexConnection resumableDuplexConnection = + new ResumableDuplexConnection( + "test", Unpooled.EMPTY_BUFFER, transport.testConnection(), framesStore); + + resumableDuplexConnection.receive().subscribe(); + + final ServerRSocketSession session = + new ServerRSocketSession( + Unpooled.EMPTY_BUFFER, + resumableDuplexConnection, + transport.testConnection(), + framesStore, + Duration.ofMinutes(1), + true); + + final KeepAliveSupport.ClientKeepAliveSupport keepAliveSupport = + new KeepAliveSupport.ClientKeepAliveSupport(transport.alloc(), 1000000, 10000000); + keepAliveSupport.resumeState(session); + session.setKeepAliveSupport(keepAliveSupport); + + // connection is active. just advance time + virtualTimeScheduler.advanceTimeBy(Duration.ofSeconds(10)); + assertThat(session.s).isNull(); + assertThat(session.isDisposed()).isFalse(); + + final ByteBuf keepAliveFrame = + KeepAliveFrameCodec.encode(transport.alloc(), false, 1529, Unpooled.EMPTY_BUFFER); + keepAliveSupport.receive(keepAliveFrame); + keepAliveFrame.release(); + + assertThat(transport.testConnection().isDisposed()).isTrue(); + // timeout should be terminated + assertThat(session.s).isNotNull(); + assertThat(session.isDisposed()).isTrue(); + + FrameAssert.assertThat(transport.testConnection().pollFrame()) + .hasStreamIdZero() + .typeOf(FrameType.ERROR) + .matches(ReferenceCounted::release); + + resumableDuplexConnection.onClose().as(StepVerifier::create).expectComplete().verify(); + + transport.alloc().assertHasNoLeaks(); + } +}