diff --git a/build.gradle b/build.gradle index e8dbaedd0..5be434a4a 100644 --- a/build.gradle +++ b/build.gradle @@ -16,10 +16,11 @@ plugins { id 'com.github.sherter.google-java-format' version '0.9' apply false - id 'me.champeau.jmh' version '0.6.4' apply false + id 'me.champeau.jmh' version '0.6.6' apply false id 'io.spring.dependency-management' version '1.0.11.RELEASE' apply false id 'io.morethan.jmhreport' version '0.9.0' apply false - id "io.github.reyerizo.gradle.jcstress" version "0.8.11" apply false + id 'io.github.reyerizo.gradle.jcstress' version '0.8.11' apply false + id 'com.github.vlsi.gradle-extensions' version '1.76' apply false } boolean isCiServer = ["CI", "CONTINUOUS_INTEGRATION", "TRAVIS", "CIRCLECI", "bamboo_planKey", "GITHUB_ACTION"].with { @@ -30,19 +31,20 @@ boolean isCiServer = ["CI", "CONTINUOUS_INTEGRATION", "TRAVIS", "CIRCLECI", "bam subprojects { apply plugin: 'io.spring.dependency-management' apply plugin: 'com.github.sherter.google-java-format' + apply plugin: 'com.github.vlsi.gradle-extensions' - ext['reactor-bom.version'] = '2020.0.7' + ext['reactor-bom.version'] = '2020.0.12' ext['logback.version'] = '1.2.3' - ext['netty-bom.version'] = '4.1.64.Final' - ext['netty-boringssl.version'] = '2.0.39.Final' + ext['netty-bom.version'] = '4.1.70.Final' + ext['netty-boringssl.version'] = '2.0.45.Final' ext['hdrhistogram.version'] = '2.1.12' - ext['mockito.version'] = '3.10.0' + ext['mockito.version'] = '3.12.4' ext['slf4j.version'] = '1.7.30' - ext['jmh.version'] = '1.31' - ext['junit.version'] = '5.7.2' + ext['jmh.version'] = '1.33' + ext['junit.version'] = '5.8.1' ext['hamcrest.version'] = '1.3' - ext['micrometer.version'] = '1.6.7' - ext['assertj.version'] = '3.19.0' + ext['micrometer.version'] = '1.7.5' + ext['assertj.version'] = '3.21.0' ext['netflix.limits.version'] = '0.3.6' ext['bouncycastle-bcpkix.version'] = '1.68' @@ -123,6 +125,7 @@ subprojects { } plugins.withType(JavaPlugin) { + compileJava { sourceCompatibility = 1.8 @@ -198,7 +201,7 @@ subprojects { if (JavaVersion.current().isJava9Compatible()) { println "Java 9+: lowering MaxGCPauseMillis to 20ms in ${project.name} ${name}" println "Java 9+: enabling leak detection [ADVANCED]" - jvmArgs = ["-XX:MaxGCPauseMillis=20", "-Dio.netty.leakDetection.level=ADVANCED"] + jvmArgs = ["-XX:MaxGCPauseMillis=20", "-Dio.netty.leakDetection.level=ADVANCED", "-Dio.netty.leakDetection.samplingInterval=32"] } systemProperty("java.awt.headless", "true") diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 0f80bbf51..e750102e0 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.0.2-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.3-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java index c3278a09c..c529b615d 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java @@ -146,10 +146,7 @@ public void onNextPrioritized(ByteBuf t) { return; } - if (isWorkInProgress(previousState) - || isCancelled(previousState) - || isDisposed(previousState) - || isTerminated(previousState)) { + if (isWorkInProgress(previousState)) { return; } @@ -185,10 +182,7 @@ public void onNext(ByteBuf t) { return; } - if (isWorkInProgress(previousState) - || isCancelled(previousState) - || isDisposed(previousState) - || isTerminated(previousState)) { + if (isWorkInProgress(previousState)) { return; } @@ -449,6 +443,7 @@ public void subscribe(CoreSubscriber actual) { if (isCancelled(previousState)) { clearAndFinalize(this); + return; } if (isDisposed(previousState)) { diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java b/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java index 8dcf67a0b..2f2f29001 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java @@ -36,6 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.CoreSubscriber; +import reactor.core.Disposable; import reactor.core.publisher.Mono; import reactor.core.publisher.Operators; import reactor.util.function.Tuple2; @@ -58,6 +59,7 @@ public class ClientRSocketSession final boolean cleanupStoreOnKeepAlive; final ByteBuf resumeToken; final String session; + final Disposable reconnectDisposable; volatile Subscription s; static final AtomicReferenceFieldUpdater S = @@ -110,10 +112,22 @@ public ClientRSocketSession( this.resumableConnection = resumableDuplexConnection; resumableDuplexConnection.onClose().doFinally(__ -> dispose()).subscribe(); - resumableDuplexConnection.onActiveConnectionClosed().subscribe(this::reconnect); + + this.reconnectDisposable = + resumableDuplexConnection.onActiveConnectionClosed().subscribe(this::reconnect); } void reconnect(int index) { + if (this.s == Operators.cancelledSubscription()) { + if (logger.isDebugEnabled()) { + logger.debug( + "Side[client]|Session[{}]. Connection[{}] is lost. Reconnecting rejected since session is closed", + session, + index); + } + return; + } + keepAliveSupport.stop(); if (logger.isDebugEnabled()) { logger.debug( @@ -147,6 +161,8 @@ public void onImpliedPosition(long remoteImpliedPos) { @Override public void dispose() { Operators.terminate(S, this); + + reconnectDisposable.dispose(); resumableConnection.dispose(); resumableFramesStore.dispose(); diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java index 933ac09ca..7ade3e59b 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java @@ -253,7 +253,7 @@ void dispose(@Nullable Throwable e) { framesSaverDisposable.dispose(); activeReceivingSubscriber.dispose(); - savableFramesSender.dispose(); + savableFramesSender.onComplete(); onConnectionClosedSink.tryEmitComplete(); if (e != null) { diff --git a/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java b/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java index 902a5844c..570a7de2f 100644 --- a/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java +++ b/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java @@ -57,6 +57,7 @@ import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Disposable; +import reactor.core.Disposables; import reactor.core.Exceptions; import reactor.core.Fuseable; import reactor.core.publisher.Flux; @@ -641,7 +642,11 @@ public String expectedPayloadMetadata() { } public void awaitClosed() { - server.onClose().and(client.onClose()).block(Duration.ofMinutes(1)); + server + .onClose() + .onErrorResume(__ -> Mono.empty()) + .and(client.onClose().onErrorResume(__ -> Mono.empty())) + .block(Duration.ofMinutes(1)); } private static class AsyncDuplexConnection implements DuplexConnection { @@ -706,6 +711,7 @@ private static class DisconnectingDuplexConnection implements DuplexConnection { private final String tag; final DuplexConnection source; final Duration delay; + final Disposable.Swap disposables = Disposables.swap(); DisconnectingDuplexConnection(String tag, DuplexConnection source, Duration delay) { this.tag = tag; @@ -715,6 +721,7 @@ private static class DisconnectingDuplexConnection implements DuplexConnection { @Override public void dispose() { + disposables.dispose(); source.dispose(); } @@ -743,14 +750,15 @@ public Flux receive() { bb -> { if (!receivedFirst) { receivedFirst = true; - Mono.delay(delay) - .takeUntilOther(source.onClose()) - .subscribe( - __ -> { - logger.warn( - "Tag {}. Disposing Connection[{}]", tag, source.hashCode()); - source.dispose(); - }); + disposables.replace( + Mono.delay(delay) + .takeUntilOther(source.onClose()) + .subscribe( + __ -> { + logger.warn( + "Tag {}. Disposing Connection[{}]", tag, source.hashCode()); + source.dispose(); + })); } }); }