Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ abstract class AbstractH2StreamMultiplexer implements Identifiable, HttpConnecti

private static final long CONNECTION_WINDOW_LOW_MARK = 10 * 1024 * 1024;

enum ConnectionHandshake { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN }
enum ConnectionHandshake { READY, ACTIVE, DRAINING, GRACEFUL_SHUTDOWN, SHUTDOWN }
enum SettingsHandshake { READY, TRANSMITTED, ACKED }

private final ProtocolIOSession ioSession;
Expand Down Expand Up @@ -142,6 +142,9 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED }

private EndpointDetails endpointDetails;
private boolean goAwayReceived;
private int shutdownLastStreamId;
private int lastProcessedRemoteStreamId;
private boolean drainPingSent;

private volatile boolean peerNoRfc7540Priorities;

Expand Down Expand Up @@ -201,6 +204,9 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED }
this.streamListener = streamListener;
this.lastActivityTime = System.currentTimeMillis();
this.validateAfterInactivity = validateAfterInactivity;
this.shutdownLastStreamId = 0;
this.lastProcessedRemoteStreamId = 0;
this.drainPingSent = false;
}

@Override
Expand Down Expand Up @@ -506,6 +512,13 @@ public final void onOutput() throws HttpException, IOException {
ioSession.getLock().unlock();
}

if (connState == ConnectionHandshake.DRAINING && !drainPingSent && outputBuffer.isEmpty() && outputQueue.isEmpty()) {
drainPingSent = true;
executePing(new PingCommand(createGracefulShutdownPingHandler()));
// Return early so the PING frame is flushed on the next onOutput cycle
return;
}

if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) {

if (connOutputWindow.get() > 0 && remoteSettingState == SettingsHandshake.ACKED) {
Expand Down Expand Up @@ -589,16 +602,16 @@ public final void onOutput() throws HttpException, IOException {
streams.dropStreamId(stream.getId());
it.remove();
} else {
if (streams.isSameSide(stream.getId()) || stream.getId() <= streams.getLastRemoteId()) {
if (streams.isSameSide(stream.getId()) || shutdownLastStreamId == 0 || stream.getId() <= shutdownLastStreamId) {
liveStreams++;
}
}
}
if (liveStreams == 0) {
if (shutdownLastStreamId != Integer.MAX_VALUE && liveStreams == 0) {
connState = ConnectionHandshake.SHUTDOWN;
}
}
if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) >= 0) {
if (connState.compareTo(ConnectionHandshake.DRAINING) >= 0) {
for (;;) {
final Command command = ioSession.poll();
if (command == null) {
Expand Down Expand Up @@ -628,6 +641,11 @@ public final void onOutput() throws HttpException, IOException {
}

public final void onTimeout(final Timeout timeout) throws HttpException, IOException {
if (connState == ConnectionHandshake.DRAINING) {
completeGracefulShutdown();
return;
}

connState = ConnectionHandshake.SHUTDOWN;

final RawFrame goAway;
Expand Down Expand Up @@ -663,13 +681,55 @@ private void executeShutdown(final ShutdownCommand shutdownCommand) throws IOExc
if (shutdownCommand.getType() == CloseMode.IMMEDIATE) {
streams.shutdownAndReleaseAll();
connState = ConnectionHandshake.SHUTDOWN;
} else {
if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
final RawFrame goAway = frameFactory.createGoAway(streams.getLastRemoteId(), H2Error.NO_ERROR, "Graceful shutdown");
commitFrame(goAway);
connState = streams.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
}
return;
}
if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
shutdownLastStreamId = Integer.MAX_VALUE;
drainPingSent = false;
final RawFrame goAway = frameFactory.createGoAway(shutdownLastStreamId, H2Error.NO_ERROR, "Graceful shutdown");
commitFrame(goAway);
connState = ConnectionHandshake.DRAINING;
requestSessionOutput();
}
}

private void completeGracefulShutdown() throws IOException {
if (connState != ConnectionHandshake.DRAINING) {
return;
}
shutdownLastStreamId = lastProcessedRemoteStreamId;
final RawFrame goAway = frameFactory.createGoAway(shutdownLastStreamId, H2Error.NO_ERROR, "Graceful shutdown");
commitFrame(goAway);
connState = ConnectionHandshake.GRACEFUL_SHUTDOWN;
}

private AsyncPingHandler createGracefulShutdownPingHandler() {
final ByteBuffer data = ByteBuffer.allocate(8);
data.putLong(System.nanoTime());
data.flip();
return new AsyncPingHandler() {

@Override
public ByteBuffer getData() {
return data.asReadOnlyBuffer();
}

@Override
public void consumeResponse(final ByteBuffer feedback) throws IOException {
if (connState == ConnectionHandshake.DRAINING) {
completeGracefulShutdown();
}
}

@Override
public void failed(final Exception cause) {
}

@Override
public void cancel() {
}

};
}

private void executePing(final PingCommand pingCommand) throws IOException {
Expand Down Expand Up @@ -817,8 +877,9 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio
}

final H2StreamChannel channel = createChannel(streamId);
if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
if (connState.compareTo(ConnectionHandshake.DRAINING) <= 0) {
stream = streams.createActive(channel, incomingRequest(channel));
lastProcessedRemoteStreamId = Math.max(lastProcessedRemoteStreamId, streamId);
streams.resetIfExceedsMaxConcurrentLimit(stream, localConfig.getMaxConcurrentStreams());
} else {
channel.localReset(H2Error.REFUSED_STREAM);
Expand Down Expand Up @@ -1026,8 +1087,9 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio

final H2StreamChannel channel = createChannel(promisedStreamId);
final H2Stream promisedStream;
if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
if (connState.compareTo(ConnectionHandshake.DRAINING) <= 0) {
promisedStream = streams.createReserved(channel, incomingPushPromise(channel, stream.getPushHandlerFactory()));
lastProcessedRemoteStreamId = Math.max(lastProcessedRemoteStreamId, promisedStreamId);
} else {
channel.localReset(H2Error.REFUSED_STREAM);
promisedStream = streams.createActive(channel, NoopH2StreamHandler.INSTANCE);
Expand All @@ -1053,17 +1115,18 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio
final int errorCode = payload.getInt();
goAwayReceived = true;
if (errorCode == H2Error.NO_ERROR.getCode()) {
if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
for (final Iterator<H2Stream> it = streams.iterator(); it.hasNext(); ) {
final H2Stream stream = it.next();
final int activeStreamId = stream.getId();
if (!streams.isSameSide(activeStreamId) && activeStreamId > processedLocalStreamId) {
stream.fail(new RequestNotExecutedException());
it.remove();
}
for (final Iterator<H2Stream> it = streams.iterator(); it.hasNext(); ) {
final H2Stream stream = it.next();
final int activeStreamId = stream.getId();
if (!streams.isSameSide(activeStreamId) && activeStreamId > processedLocalStreamId) {
stream.fail(new RequestNotExecutedException());
it.remove();
}
}
connState = streams.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
if (connState != ConnectionHandshake.DRAINING) {
shutdownLastStreamId = processedLocalStreamId;
connState = ConnectionHandshake.GRACEFUL_SHUTDOWN;
}
} else {
for (final Iterator<H2Stream> it = streams.iterator(); it.hasNext(); ) {
final H2Stream stream = it.next();
Expand Down
Loading
Loading