Skip to content

Commit 71eb82d

Browse files
author
MDH
committed
Fix ehcache reconnect
1 parent 900433f commit 71eb82d

18 files changed

Lines changed: 674 additions & 531 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright Terracotta, Inc.
3+
* Copyright IBM Corp. 2024, 2025
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.ehcache.clustered.client.internal.reconnect;
18+
19+
import org.ehcache.clustered.client.config.Timeouts;
20+
import org.ehcache.clustered.client.internal.service.ClusterTierException;
21+
import org.ehcache.clustered.client.internal.store.ClusterTierClientEntity;
22+
import org.ehcache.clustered.common.internal.ServerStoreConfiguration;
23+
import org.ehcache.clustered.common.internal.exceptions.ClusterException;
24+
import org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse;
25+
import org.ehcache.clustered.common.internal.messages.EhcacheOperationMessage;
26+
import org.ehcache.clustered.common.internal.messages.StateRepositoryOpMessage;
27+
28+
import java.util.concurrent.TimeoutException;
29+
30+
public class FailedReconnectClusterTierClientEntity implements ClusterTierClientEntity {
31+
private final String cacheId;
32+
private final Throwable failure;
33+
34+
public FailedReconnectClusterTierClientEntity(String cacheId, Throwable failure) {
35+
this.cacheId = cacheId;
36+
this.failure = failure;
37+
}
38+
39+
public String getCacheId() {
40+
return cacheId;
41+
}
42+
43+
@Override
44+
public Timeouts getTimeouts() {
45+
throw new RuntimeException("Cache " + getCacheId() + " failed reconnecting to cluster", failure);
46+
}
47+
48+
@Override
49+
public boolean isConnected() {
50+
throw new RuntimeException("Cache " + getCacheId() + " failed reconnecting to cluster", failure);
51+
}
52+
53+
@Override
54+
public void validate(ServerStoreConfiguration clientStoreConfiguration) throws ClusterTierException, TimeoutException {
55+
throw new RuntimeException("Cache " + getCacheId() + " failed reconnecting to cluster", failure);
56+
}
57+
58+
@Override
59+
public void invokeAndWaitForSend(EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException {
60+
throw new RuntimeException("Cache " + getCacheId() + " failed reconnecting to cluster", failure);
61+
}
62+
63+
@Override
64+
public void invokeAndWaitForReceive(EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException {
65+
throw new RuntimeException("Cache " + getCacheId() + " failed reconnecting to cluster", failure);
66+
}
67+
68+
@Override
69+
public EhcacheEntityResponse invokeAndWaitForComplete(EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException {
70+
throw new RuntimeException("Cache " + getCacheId() + " failed reconnecting to cluster", failure);
71+
}
72+
73+
@Override
74+
public EhcacheEntityResponse invokeAndWaitForRetired(EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException {
75+
throw new RuntimeException("Cache " + getCacheId() + " failed reconnecting to cluster", failure);
76+
}
77+
78+
@Override
79+
public EhcacheEntityResponse invokeStateRepositoryOperation(StateRepositoryOpMessage message, boolean track) throws ClusterException, TimeoutException {
80+
throw new RuntimeException("Cache " + getCacheId() + " failed reconnecting to cluster", failure);
81+
}
82+
83+
@Override
84+
public <T extends EhcacheEntityResponse> void addResponseListener(Class<T> responseType, ResponseListener<T> responseListener) {
85+
throw new RuntimeException("Cache " + getCacheId() + " failed reconnecting to cluster", failure);
86+
}
87+
88+
@Override
89+
public void addDisconnectionListener(DisconnectionListener disconnectionListener) {
90+
//ignore
91+
}
92+
93+
@Override
94+
public void addReconnectListener(ReconnectListener reconnectListener) {
95+
//ignore
96+
}
97+
98+
@Override
99+
public void enableEvents(boolean enable) throws ClusterException, TimeoutException {
100+
//ignore
101+
}
102+
103+
@Override
104+
public void close() {
105+
//ignore
106+
}
107+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
/*
2+
* Copyright Terracotta, Inc.
3+
* Copyright IBM Corp. 2024, 2025
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.ehcache.clustered.client.internal.reconnect;
18+
19+
import org.ehcache.clustered.client.config.Timeouts;
20+
import org.ehcache.clustered.client.internal.service.ClusterTierException;
21+
import org.ehcache.clustered.client.internal.store.ClusterTierClientEntity;
22+
import org.ehcache.clustered.client.internal.store.ReconnectInProgressException;
23+
import org.ehcache.clustered.common.internal.ServerStoreConfiguration;
24+
import org.ehcache.clustered.common.internal.exceptions.ClusterException;
25+
import org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse;
26+
import org.ehcache.clustered.common.internal.messages.EhcacheOperationMessage;
27+
import org.ehcache.clustered.common.internal.messages.StateRepositoryOpMessage;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
import org.terracotta.exception.ConnectionClosedException;
31+
import org.terracotta.exception.ConnectionShutdownException;
32+
33+
import java.util.List;
34+
import java.util.Map;
35+
import java.util.concurrent.ConcurrentHashMap;
36+
import java.util.concurrent.CopyOnWriteArrayList;
37+
import java.util.concurrent.TimeoutException;
38+
import java.util.concurrent.atomic.AtomicReference;
39+
40+
import static org.ehcache.core.util.ExceptionUtil.containsCause;
41+
42+
public class ReconnectableClusterTierClientEntity implements ClusterTierClientEntity {
43+
private static final Logger LOGGER = LoggerFactory.getLogger(ReconnectableClusterTierClientEntity.class);
44+
45+
private final AtomicReference<ClusterTierClientEntity> delegateRef = new AtomicReference<>();
46+
private final Runnable runnable;
47+
48+
private final Map<Class<? extends EhcacheEntityResponse>, List<ResponseListener<? extends EhcacheEntityResponse>>> responseListeners =
49+
new ConcurrentHashMap<>();
50+
private final List<DisconnectionListener> disconnectionListeners = new CopyOnWriteArrayList<>();
51+
private final List<ReconnectListener> reconnectListeners = new CopyOnWriteArrayList<>();
52+
private volatile boolean enableEventing = false;
53+
54+
public ReconnectableClusterTierClientEntity(ClusterTierClientEntity clientEntity, Runnable runnable) {
55+
delegateRef.set(clientEntity);
56+
this.runnable = runnable;
57+
}
58+
59+
private ClusterTierClientEntity delegate() {
60+
return delegateRef.get();
61+
}
62+
63+
@SuppressWarnings("unchecked")
64+
public void setDelegateRef(ClusterTierClientEntity clientEntity) {
65+
responseListeners.forEach((k, v) -> {
66+
v.forEach(resp -> clientEntity.addResponseListener(k, (ClusterTierClientEntity.ResponseListener)resp));
67+
});
68+
disconnectionListeners.forEach(clientEntity::addDisconnectionListener);
69+
reconnectListeners.forEach(clientEntity::addReconnectListener);
70+
delegateRef.set(clientEntity);
71+
}
72+
73+
public boolean enableEventing() {
74+
return enableEventing;
75+
}
76+
77+
@Override
78+
public Timeouts getTimeouts() {
79+
return delegate().getTimeouts();
80+
}
81+
82+
@Override
83+
public boolean isConnected() {
84+
return delegate().isConnected();
85+
}
86+
87+
@Override
88+
public void validate(ServerStoreConfiguration clientStoreConfiguration) throws TimeoutException {
89+
try {
90+
onReconnect(clientEntity -> {
91+
try {
92+
clientEntity.validate(clientStoreConfiguration);
93+
} catch (ClusterTierException e) {
94+
throw new RuntimeException(e);
95+
}
96+
return null;
97+
});
98+
} catch (ClusterException e) {
99+
throw new RuntimeException(e);
100+
}
101+
}
102+
103+
@Override
104+
public void invokeAndWaitForSend(EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException {
105+
onReconnect(clientEntity -> {
106+
clientEntity.invokeAndWaitForSend(message, track);
107+
return null;
108+
});
109+
}
110+
111+
@Override
112+
public void invokeAndWaitForReceive(EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException {
113+
onReconnect(clientEntity -> {
114+
clientEntity.invokeAndWaitForRetired(message, track);
115+
return null;
116+
});
117+
}
118+
119+
@Override
120+
public EhcacheEntityResponse invokeAndWaitForComplete(EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException {
121+
return onReconnect(clientEntity -> clientEntity.invokeAndWaitForComplete(message, track));
122+
}
123+
124+
@Override
125+
public EhcacheEntityResponse invokeAndWaitForRetired(EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException {
126+
return onReconnect(clientEntity -> clientEntity.invokeAndWaitForRetired(message, track));
127+
}
128+
129+
@Override
130+
public EhcacheEntityResponse invokeStateRepositoryOperation(StateRepositoryOpMessage message, boolean track) throws ClusterException, TimeoutException {
131+
return onReconnect(clientEntity -> clientEntity.invokeStateRepositoryOperation(message, track));
132+
}
133+
134+
@Override
135+
public <T extends EhcacheEntityResponse> void addResponseListener(Class<T> responseType, ResponseListener<T> responseListener) {
136+
delegate().addResponseListener(responseType, responseListener);
137+
138+
List<ResponseListener<? extends EhcacheEntityResponse>> responseListeners = this.responseListeners.get(responseType);
139+
if (responseListeners == null) {
140+
responseListeners = new CopyOnWriteArrayList<>();
141+
this.responseListeners.put(responseType, responseListeners);
142+
}
143+
responseListeners.add(responseListener);
144+
}
145+
146+
@Override
147+
public void addDisconnectionListener(DisconnectionListener disconnectionListener) {
148+
delegate().addDisconnectionListener(disconnectionListener);
149+
disconnectionListeners.add(disconnectionListener);
150+
}
151+
152+
@Override
153+
public void addReconnectListener(ReconnectListener reconnectListener) {
154+
delegate().addReconnectListener(reconnectListener);
155+
reconnectListeners.add(reconnectListener);
156+
}
157+
158+
@Override
159+
public void enableEvents(boolean enable) throws ClusterException, TimeoutException {
160+
onReconnect(clientEntity -> {
161+
clientEntity.enableEvents(enable);
162+
return null;
163+
});
164+
enableEventing = enable;
165+
}
166+
167+
@Override
168+
public void close() {
169+
try {
170+
delegate().close();
171+
} catch (Throwable t) {
172+
if (containsCause(t, ConnectionClosedException.class) || containsCause(t, ConnectionShutdownException.class)) {
173+
LOGGER.debug("Store was already closed, since connection was closed");
174+
} else {
175+
throw t;
176+
}
177+
}
178+
}
179+
180+
private <T> T onReconnect(TimeoutAndClusterExceptionFunction<ClusterTierClientEntity, T> function) throws TimeoutException, ClusterException {
181+
ClusterTierClientEntity cl = delegate();
182+
try {
183+
return function.apply(cl);
184+
} catch (Exception sspe) {
185+
if (containsCause(sspe, ConnectionClosedException.class)) {
186+
if (delegateRef.compareAndSet(cl, new ReconnectInProgressClusterTierClientEntity())) {
187+
runnable.run();
188+
}
189+
return onReconnect(function);
190+
} else {
191+
throw sspe;
192+
}
193+
}
194+
}
195+
196+
@FunctionalInterface
197+
private interface TimeoutAndClusterExceptionFunction<U, V> {
198+
V apply(U u) throws TimeoutException, ClusterException;
199+
}
200+
201+
private static class ReconnectInProgressClusterTierClientEntity implements ClusterTierClientEntity {
202+
@Override
203+
public Timeouts getTimeouts() {
204+
throw new ReconnectInProgressException();
205+
}
206+
207+
@Override
208+
public boolean isConnected() {
209+
throw new ReconnectInProgressException();
210+
}
211+
212+
@Override
213+
public void validate(ServerStoreConfiguration clientStoreConfiguration) throws ClusterTierException, TimeoutException {
214+
throw new ReconnectInProgressException();
215+
}
216+
217+
@Override
218+
public void invokeAndWaitForSend(EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException {
219+
throw new ReconnectInProgressException();
220+
}
221+
222+
@Override
223+
public void invokeAndWaitForReceive(EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException {
224+
throw new ReconnectInProgressException();
225+
}
226+
227+
@Override
228+
public EhcacheEntityResponse invokeAndWaitForComplete(EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException {
229+
throw new ReconnectInProgressException();
230+
}
231+
232+
@Override
233+
public EhcacheEntityResponse invokeAndWaitForRetired(EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException {
234+
throw new ReconnectInProgressException();
235+
}
236+
237+
@Override
238+
public EhcacheEntityResponse invokeStateRepositoryOperation(StateRepositoryOpMessage message, boolean track) throws ClusterException, TimeoutException {
239+
throw new ReconnectInProgressException();
240+
}
241+
242+
@Override
243+
public <T extends EhcacheEntityResponse> void addResponseListener(Class<T> responseType, ResponseListener<T> responseListener) {
244+
throw new ReconnectInProgressException();
245+
}
246+
247+
@Override
248+
public void addDisconnectionListener(DisconnectionListener disconnectionListener) {
249+
throw new ReconnectInProgressException();
250+
}
251+
252+
@Override
253+
public void addReconnectListener(ReconnectListener reconnectListener) {
254+
throw new ReconnectInProgressException();
255+
}
256+
257+
@Override
258+
public void enableEvents(boolean enable) throws ClusterException, TimeoutException {
259+
throw new ReconnectInProgressException();
260+
}
261+
262+
@Override
263+
public void close() {
264+
}
265+
}
266+
}

0 commit comments

Comments
 (0)