Skip to content

Commit 19c2898

Browse files
authored
Merge pull request #1165 from ably/ECO-5514/fix-websocket-close
[ECO-5514] fix: improve WebSocket transport lifecycle and activity management
2 parents a40bb30 + f14844a commit 19c2898

4 files changed

Lines changed: 280 additions & 58 deletions

File tree

lib/src/main/java/io/ably/lib/transport/ConnectionManager.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.ably.lib.types.ClientOptions;
3232
import io.ably.lib.types.ConnectionDetails;
3333
import io.ably.lib.types.ErrorInfo;
34+
import io.ably.lib.types.Param;
3435
import io.ably.lib.types.ProtocolMessage;
3536
import io.ably.lib.types.ProtocolSerializer;
3637
import io.ably.lib.util.Log;
@@ -857,6 +858,23 @@ public void requestState(StateIndication state) {
857858
requestState(null, state);
858859
}
859860

861+
/**
862+
* Get query params representing the current authentication method and credentials.
863+
*/
864+
Param[] getAuthParams() throws AblyException {
865+
return ably.auth.getAuthParams();
866+
}
867+
868+
/**
869+
* Determines if the given WebSocketTransport instance is the currently active transport.
870+
*
871+
* @param transport the WebSocketTransport instance to check against the active transport
872+
* @return true if the provided transport is the currently active transport, false otherwise
873+
*/
874+
boolean isActiveTransport(WebSocketTransport transport) {
875+
return transport == this.transport;
876+
}
877+
860878
private synchronized void requestState(ITransport transport, StateIndication stateIndication) {
861879
Log.v(TAG, "requestState(): requesting " + stateIndication.state + "; id = " + connection.id);
862880
addAction(new AsynchronousStateChangeAction(transport, stateIndication));
@@ -2002,7 +2020,7 @@ private boolean isFatalError(ErrorInfo err) {
20022020
private ErrorInfo stateError;
20032021
private ConnectParams pendingConnect;
20042022
private boolean suppressRetry; /* for tests only; modified via reflection */
2005-
private ITransport transport;
2023+
private volatile ITransport transport;
20062024
private long suspendTime;
20072025
public long msgSerial;
20082026
private long lastActivity;

lib/src/main/java/io/ably/lib/transport/WebSocketTransport.java

Lines changed: 98 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,11 @@ public class WebSocketTransport implements ITransport {
5252
private ConnectListener connectListener;
5353
private WebSocketClient webSocketClient;
5454
private final WebSocketEngine webSocketEngine;
55+
private WebSocketHandler webSocketHandler;
5556
private boolean activityCheckTurnedOff = false;
5657

58+
private boolean connectHasBeenCalled = false;
59+
5760
/******************
5861
* protected constructor
5962
******************/
@@ -94,22 +97,26 @@ private static WebSocketEngine createWebSocketEngine(TransportParams params) {
9497
* ITransport methods
9598
******************/
9699

100+
/**
101+
* Connect is called once when we create transport;
102+
* after transport is closed, we never call `connect` again
103+
*/
97104
@Override
98105
public void connect(ConnectListener connectListener) {
106+
ensureConnectCalledOnce();
99107
this.connectListener = connectListener;
100108
try {
101109
boolean isTls = params.options.tls;
102110
String wsScheme = isTls ? "wss://" : "ws://";
103111
wsUri = wsScheme + params.host + ':' + params.port + "/";
104-
Param[] authParams = connectionManager.ably.auth.getAuthParams();
112+
Param[] authParams = connectionManager.getAuthParams();
105113
Param[] connectParams = params.getConnectParams(authParams);
106114
if (connectParams.length > 0)
107115
wsUri = HttpUtils.encodeParams(wsUri, connectParams);
108116

109117
Log.d(TAG, "connect(); wsUri = " + wsUri);
110-
synchronized (this) {
111-
webSocketClient = this.webSocketEngine.create(wsUri, new WebSocketHandler(this::receive));
112-
}
118+
webSocketHandler = new WebSocketHandler(this::receive);
119+
webSocketClient = this.webSocketEngine.create(wsUri, webSocketHandler);
113120
webSocketClient.connect();
114121
} catch (AblyException e) {
115122
Log.e(TAG, "Unexpected exception attempting connection; wsUri = " + wsUri, e);
@@ -120,14 +127,36 @@ public void connect(ConnectListener connectListener) {
120127
}
121128
}
122129

130+
/**
131+
* `connect()` can't be called more than once
132+
*/
133+
private synchronized void ensureConnectCalledOnce() {
134+
if (connectHasBeenCalled) throw new IllegalStateException("WebSocketTransport is already initialized");
135+
connectHasBeenCalled = true;
136+
}
137+
123138
@Override
124139
public void close() {
125140
Log.d(TAG, "close()");
126-
synchronized (this) {
127-
if (webSocketClient != null) {
128-
webSocketClient.close();
129-
webSocketClient = null;
130-
}
141+
// Take local snapshots of the shared references. Callback threads (e.g., onClose)
142+
// may concurrently set these fields to null.
143+
//
144+
// Intentionally avoid synchronizing here:
145+
// - The WebSocket library may invoke our WebSocketHandler while holding its own
146+
// internal locks.
147+
// - If close() also acquired a lock on WebSocketTransport, we could invert the
148+
// lock order and create a circular wait (deadlock): close() waits for the WS
149+
// library to release its lock, while the WS library waits for a lock on
150+
// WebSocketTransport.
151+
final WebSocketClient client = webSocketClient;
152+
final WebSocketHandler handler = webSocketHandler;
153+
if (client != null && handler != null) {
154+
// Record activity so the activity timer remains armed. If a graceful close
155+
// stalls, the timer can detect inactivity and force-cancel the socket.
156+
handler.flagActivity();
157+
client.close();
158+
} else {
159+
Log.w(TAG, "close() called on uninitialized or already closed transport");
131160
}
132161
}
133162

@@ -191,6 +220,11 @@ public String toString() {
191220
public String getURL() {
192221
return wsUri;
193222
}
223+
224+
private boolean isActiveTransport() {
225+
return connectionManager.isActiveTransport(this);
226+
}
227+
194228
//interface to transfer Protocol message from websocket
195229
interface WebSocketReceiver {
196230
void onMessage(ProtocolMessage protocolMessage) throws AblyException;
@@ -217,9 +251,14 @@ class WebSocketHandler implements WebSocketListener {
217251
* WsClient private members
218252
***************************/
219253

220-
private Timer timer = new Timer();
221-
private TimerTask activityTimerTask = null;
222-
private long lastActivityTime;
254+
private final Timer timer = new Timer();
255+
private volatile TimerTask activityTimerTask = null;
256+
private volatile long lastActivityTime;
257+
258+
/**
259+
* Monitor for activity timer events
260+
*/
261+
private final Object activityTimerMonitor = new Object();
223262

224263
WebSocketHandler(WebSocketReceiver receiver) {
225264
this.receiver = receiver;
@@ -318,66 +357,68 @@ public void onOldJavaVersionDetected(Throwable throwable) {
318357
Log.w(TAG, "Error when trying to set SSL parameters, most likely due to an old Java API version", throwable);
319358
}
320359

321-
private synchronized void dispose() {
322-
/* dispose timer */
323-
try {
324-
timer.cancel();
325-
timer = null;
326-
} catch (IllegalStateException e) {
327-
}
360+
private void dispose() {
361+
timer.cancel();
328362
}
329363

330-
private synchronized void flagActivity() {
331-
lastActivityTime = System.currentTimeMillis();
332-
connectionManager.setLastActivity(lastActivityTime);
333-
if (activityTimerTask == null && connectionManager.maxIdleInterval != 0 && !activityCheckTurnedOff) {
334-
/* No timer currently running because previously there was no
335-
* maxIdleInterval configured, but now there is a
336-
* maxIdleInterval configured. Call checkActivity so a timer
337-
* gets started. This happens when flagActivity gets called
338-
* just after processing the connect message that configures
339-
* maxIdleInterval. */
340-
checkActivity();
364+
private void flagActivity() {
365+
if (isActiveTransport()) {
366+
lastActivityTime = System.currentTimeMillis();
367+
connectionManager.setLastActivity(lastActivityTime);
341368
}
369+
370+
if (connectionManager.maxIdleInterval == 0) {
371+
Log.v(TAG, "checkActivity: turned off because maxIdleInterval is 0");
372+
return;
373+
}
374+
375+
if (activityCheckTurnedOff) {
376+
Log.v(TAG, "checkActivity: turned off for test purpose");
377+
return;
378+
}
379+
380+
checkActivity();
342381
}
343382

344-
private synchronized void checkActivity() {
383+
private void checkActivity() {
345384
long timeout = getActivityTimeout();
385+
346386
if (timeout == 0) {
347387
Log.v(TAG, "checkActivity: infinite timeout");
348388
return;
349389
}
350390

351-
// Check if timer already running
352-
if (activityTimerTask != null) {
353-
return;
354-
}
391+
// prevent going to the synchronized block if the timer is active
392+
if (activityTimerTask != null) return;
355393

356-
// Start the activity timer task
357-
startActivityTimer(timeout + 100);
394+
synchronized (activityTimerMonitor) {
395+
// Check if timer already running
396+
if (activityTimerTask == null) {
397+
// Start the activity timer task
398+
startActivityTimer(timeout + 100);
399+
}
400+
}
358401
}
359402

360-
private synchronized void startActivityTimer(long timeout) {
361-
if (activityTimerTask == null) {
362-
schedule((activityTimerTask = new TimerTask() {
363-
public void run() {
364-
try {
365-
onActivityTimerExpiry();
366-
} catch (Throwable t) {
367-
Log.e(TAG, "Unexpected exception in activity timer handler", t);
368-
}
403+
private void startActivityTimer(long timeout) {
404+
activityTimerTask = new TimerTask() {
405+
public void run() {
406+
try {
407+
onActivityTimerExpiry();
408+
} catch (Exception exception) {
409+
Log.e(TAG, "Unexpected exception in activity timer handler", exception);
410+
webSocketClient.cancel(ABNORMAL_CLOSE, "Activity timer closed unexpectedly");
369411
}
370-
}), timeout);
371-
}
412+
}
413+
};
414+
schedule(activityTimerTask, timeout);
372415
}
373416

374-
private synchronized void schedule(TimerTask task, long delay) {
375-
if (timer != null) {
376-
try {
377-
timer.schedule(task, delay);
378-
} catch (IllegalStateException ise) {
379-
Log.e(TAG, "Unexpected exception scheduling activity timer", ise);
380-
}
417+
private void schedule(TimerTask task, long delay) {
418+
try {
419+
timer.schedule(task, delay);
420+
} catch (IllegalStateException ise) {
421+
Log.w(TAG, "Timer has already has been canceled", ise);
381422
}
382423
}
383424

@@ -392,7 +433,7 @@ private void onActivityTimerExpiry() {
392433
return;
393434
}
394435

395-
synchronized (this) {
436+
synchronized (activityTimerMonitor) {
396437
activityTimerTask = null;
397438
// Otherwise, we've had some activity, restart the timer for the next timeout
398439
Log.v(TAG, "onActivityTimerExpiry: ok");
@@ -401,7 +442,7 @@ private void onActivityTimerExpiry() {
401442
}
402443

403444
private long getActivityTimeout() {
404-
return connectionManager.maxIdleInterval + connectionManager.ably.options.realtimeRequestTimeout;
445+
return connectionManager.maxIdleInterval + params.options.realtimeRequestTimeout;
405446
}
406447
}
407448

lib/src/test/java/io/ably/lib/test/common/Helpers.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -969,6 +969,16 @@ public static boolean equalNullableStrings(String one, String two) {
969969
return (one == null) ? (two == null) : one.equals(two);
970970
}
971971

972+
public static void setPrivateField(Object object, String fieldName, Object value) {
973+
try {
974+
Field connectionStateField = object.getClass().getDeclaredField(fieldName);
975+
connectionStateField.setAccessible(true);
976+
connectionStateField.set(object, value);
977+
} catch (Exception e) {
978+
fail("Failed accessing " + fieldName + " with error " + e);
979+
}
980+
}
981+
972982
public static class RawHttpRequest {
973983
public String id;
974984
public URL url;

0 commit comments

Comments
 (0)