diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java index 89c1500bb..600b2ab08 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ import static io.rsocket.keepalive.KeepAliveSupport.ClientKeepAliveSupport; import io.netty.buffer.ByteBuf; +import io.netty.util.collection.IntObjectMap; import io.rsocket.DuplexConnection; import io.rsocket.Payload; import io.rsocket.RSocket; @@ -343,15 +344,15 @@ private void terminate(Throwable e) { } synchronized (this) { - activeStreams - .values() - .forEach( - receiver -> { - try { - receiver.handleError(e); - } catch (Throwable ignored) { - } - }); + for (IntObjectMap.PrimitiveEntry entry : activeStreams.entries()) { + FrameHandler handler = entry.value(); + if (handler != null) { + try { + handler.handleError(e); + } catch (Throwable ignored) { + } + } + } } if (e == CLOSED_CHANNEL_EXCEPTION) { diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java index f0a052b93..969353bd6 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package io.rsocket.core; import io.netty.buffer.ByteBuf; +import io.netty.util.collection.IntObjectMap; import io.rsocket.DuplexConnection; import io.rsocket.Payload; import io.rsocket.RSocket; @@ -183,7 +184,12 @@ final void doOnDispose() { } private synchronized void cleanUpSendingSubscriptions() { - activeStreams.values().forEach(FrameHandler::handleCancel); + for (IntObjectMap.PrimitiveEntry entry : activeStreams.entries()) { + FrameHandler handler = entry.value(); + if (handler != null) { + handler.handleCancel(); + } + } activeStreams.clear(); }