Skip to content

Commit 173eeda

Browse files
TiSession support graceful close
Signed-off-by: marsishandsome <marsishandsome@gmail.com>
1 parent 798244c commit 173eeda

File tree

2 files changed

+250
-21
lines changed

2 files changed

+250
-21
lines changed

src/main/java/org/tikv/common/TiSession.java

Lines changed: 126 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public class TiSession implements AutoCloseable {
7070
private volatile boolean enableGrpcForward;
7171
private volatile RegionStoreClient.RegionStoreClientBuilder clientBuilder;
7272
private volatile ImporterStoreClient.ImporterStoreClientBuilder importerClientBuilder;
73-
private boolean isClosed = false;
73+
private volatile boolean isClosed = false;
7474
private MetricsServer metricsServer;
7575
private static final int MAX_SPLIT_REGION_STACK_DEPTH = 6;
7676

@@ -106,22 +106,30 @@ public static TiSession getInstance(TiConfiguration conf) {
106106
}
107107

108108
public RawKVClient createRawClient() {
109+
checkIsClosed();
110+
109111
RegionStoreClientBuilder builder =
110112
new RegionStoreClientBuilder(conf, channelFactory, this.getRegionManager(), client);
111113
return new RawKVClient(this, builder);
112114
}
113115

114116
public KVClient createKVClient() {
117+
checkIsClosed();
118+
115119
RegionStoreClientBuilder builder =
116120
new RegionStoreClientBuilder(conf, channelFactory, this.getRegionManager(), client);
117121
return new KVClient(conf, builder);
118122
}
119123

120124
public TxnKVClient createTxnClient() {
125+
checkIsClosed();
126+
121127
return new TxnKVClient(conf, this.getRegionStoreClientBuilder(), this.getPDClient());
122128
}
123129

124130
public RegionStoreClient.RegionStoreClientBuilder getRegionStoreClientBuilder() {
131+
checkIsClosed();
132+
125133
RegionStoreClient.RegionStoreClientBuilder res = clientBuilder;
126134
if (res == null) {
127135
synchronized (this) {
@@ -137,6 +145,8 @@ public RegionStoreClient.RegionStoreClientBuilder getRegionStoreClientBuilder()
137145
}
138146

139147
public ImporterStoreClient.ImporterStoreClientBuilder getImporterRegionStoreClientBuilder() {
148+
checkIsClosed();
149+
140150
ImporterStoreClient.ImporterStoreClientBuilder res = importerClientBuilder;
141151
if (res == null) {
142152
synchronized (this) {
@@ -156,18 +166,26 @@ public TiConfiguration getConf() {
156166
}
157167

158168
public TiTimestamp getTimestamp() {
169+
checkIsClosed();
170+
159171
return getPDClient().getTimestamp(ConcreteBackOffer.newTsoBackOff());
160172
}
161173

162174
public Snapshot createSnapshot() {
175+
checkIsClosed();
176+
163177
return new Snapshot(getTimestamp(), this);
164178
}
165179

166180
public Snapshot createSnapshot(TiTimestamp ts) {
181+
checkIsClosed();
182+
167183
return new Snapshot(ts, this);
168184
}
169185

170186
public PDClient getPDClient() {
187+
checkIsClosed();
188+
171189
PDClient res = client;
172190
if (res == null) {
173191
synchronized (this) {
@@ -181,6 +199,8 @@ public PDClient getPDClient() {
181199
}
182200

183201
public Catalog getCatalog() {
202+
checkIsClosed();
203+
184204
Catalog res = catalog;
185205
if (res == null) {
186206
synchronized (this) {
@@ -194,6 +214,8 @@ public Catalog getCatalog() {
194214
}
195215

196216
public RegionManager getRegionManager() {
217+
checkIsClosed();
218+
197219
RegionManager res = regionManager;
198220
if (res == null) {
199221
synchronized (this) {
@@ -207,6 +229,8 @@ public RegionManager getRegionManager() {
207229
}
208230

209231
public ExecutorService getThreadPoolForIndexScan() {
232+
checkIsClosed();
233+
210234
ExecutorService res = indexScanThreadPool;
211235
if (res == null) {
212236
synchronized (this) {
@@ -226,6 +250,8 @@ public ExecutorService getThreadPoolForIndexScan() {
226250
}
227251

228252
public ExecutorService getThreadPoolForTableScan() {
253+
checkIsClosed();
254+
229255
ExecutorService res = tableScanThreadPool;
230256
if (res == null) {
231257
synchronized (this) {
@@ -242,6 +268,8 @@ public ExecutorService getThreadPoolForTableScan() {
242268
}
243269

244270
public ExecutorService getThreadPoolForBatchPut() {
271+
checkIsClosed();
272+
245273
ExecutorService res = batchPutThreadPool;
246274
if (res == null) {
247275
synchronized (this) {
@@ -261,6 +289,8 @@ public ExecutorService getThreadPoolForBatchPut() {
261289
}
262290

263291
public ExecutorService getThreadPoolForBatchGet() {
292+
checkIsClosed();
293+
264294
ExecutorService res = batchGetThreadPool;
265295
if (res == null) {
266296
synchronized (this) {
@@ -280,6 +310,8 @@ public ExecutorService getThreadPoolForBatchGet() {
280310
}
281311

282312
public ExecutorService getThreadPoolForBatchDelete() {
313+
checkIsClosed();
314+
283315
ExecutorService res = batchDeleteThreadPool;
284316
if (res == null) {
285317
synchronized (this) {
@@ -299,6 +331,8 @@ public ExecutorService getThreadPoolForBatchDelete() {
299331
}
300332

301333
public ExecutorService getThreadPoolForBatchScan() {
334+
checkIsClosed();
335+
302336
ExecutorService res = batchScanThreadPool;
303337
if (res == null) {
304338
synchronized (this) {
@@ -318,6 +352,8 @@ public ExecutorService getThreadPoolForBatchScan() {
318352
}
319353

320354
public ExecutorService getThreadPoolForDeleteRange() {
355+
checkIsClosed();
356+
321357
ExecutorService res = deleteRangeThreadPool;
322358
if (res == null) {
323359
synchronized (this) {
@@ -338,6 +374,8 @@ public ExecutorService getThreadPoolForDeleteRange() {
338374

339375
@VisibleForTesting
340376
public ChannelFactory getChannelFactory() {
377+
checkIsClosed();
378+
341379
return channelFactory;
342380
}
343381

@@ -347,6 +385,8 @@ public ChannelFactory getChannelFactory() {
347385
* @return a SwitchTiKVModeClient
348386
*/
349387
public SwitchTiKVModeClient getSwitchTiKVModeClient() {
388+
checkIsClosed();
389+
350390
return new SwitchTiKVModeClient(getPDClient(), getImporterRegionStoreClientBuilder());
351391
}
352392

@@ -363,6 +403,8 @@ public void splitRegionAndScatter(
363403
int splitRegionBackoffMS,
364404
int scatterRegionBackoffMS,
365405
int scatterWaitMS) {
406+
checkIsClosed();
407+
366408
logger.info(String.format("split key's size is %d", splitKeys.size()));
367409
long startMS = System.currentTimeMillis();
368410

@@ -412,6 +454,8 @@ public void splitRegionAndScatter(
412454
* @param splitKeys
413455
*/
414456
public void splitRegionAndScatter(List<byte[]> splitKeys) {
457+
checkIsClosed();
458+
415459
int splitRegionBackoffMS = BackOffer.SPLIT_REGION_BACKOFF;
416460
int scatterRegionBackoffMS = BackOffer.SCATTER_REGION_BACKOFF;
417461
int scatterWaitMS = conf.getScatterWaitSeconds() * 1000;
@@ -475,50 +519,111 @@ private List<Metapb.Region> splitRegion(
475519
return regions;
476520
}
477521

478-
@Override
479-
public synchronized void close() throws Exception {
522+
private void checkIsClosed() {
480523
if (isClosed) {
481-
logger.warn("this TiSession is already closed!");
482-
return;
524+
throw new RuntimeException("this TiSession is closed!");
525+
}
526+
}
527+
528+
public synchronized void closeAwaitTermination(long timeoutMS) throws Exception {
529+
shutdown(false);
530+
531+
long startMS = System.currentTimeMillis();
532+
while (true) {
533+
if (isTerminatedExecutorServices()) {
534+
cleanAfterTerminated();
535+
return;
536+
}
537+
538+
if (System.currentTimeMillis() - startMS > timeoutMS) {
539+
shutdown(true);
540+
return;
541+
}
542+
Thread.sleep(500);
483543
}
544+
}
545+
546+
@Override
547+
public synchronized void close() throws Exception {
548+
shutdown(true);
549+
}
484550

485-
if (metricsServer != null) {
486-
metricsServer.close();
551+
private synchronized void shutdown(boolean now) throws Exception {
552+
if (!isClosed) {
553+
isClosed = true;
554+
synchronized (sessionCachedMap) {
555+
sessionCachedMap.remove(conf.getPdAddrsString());
556+
}
557+
558+
if (metricsServer != null) {
559+
metricsServer.close();
560+
}
487561
}
488562

489-
isClosed = true;
490-
synchronized (sessionCachedMap) {
491-
sessionCachedMap.remove(conf.getPdAddrsString());
563+
if (now) {
564+
shutdownNowExecutorServices();
565+
cleanAfterTerminated();
566+
} else {
567+
shutdownExecutorServices();
492568
}
569+
}
570+
571+
private synchronized void cleanAfterTerminated() throws InterruptedException {
493572
if (regionManager != null) {
494573
regionManager.close();
495574
}
575+
if (client != null) {
576+
client.close();
577+
}
578+
if (catalog != null) {
579+
catalog.close();
580+
}
581+
}
582+
583+
private List<ExecutorService> getExecutorServices() {
584+
List<ExecutorService> executorServiceList = new ArrayList<>();
496585
if (tableScanThreadPool != null) {
497-
tableScanThreadPool.shutdownNow();
586+
executorServiceList.add(tableScanThreadPool);
498587
}
499588
if (indexScanThreadPool != null) {
500-
indexScanThreadPool.shutdownNow();
589+
executorServiceList.add(indexScanThreadPool);
501590
}
502591
if (batchGetThreadPool != null) {
503-
batchGetThreadPool.shutdownNow();
592+
executorServiceList.add(batchGetThreadPool);
504593
}
505594
if (batchPutThreadPool != null) {
506-
batchPutThreadPool.shutdownNow();
595+
executorServiceList.add(batchPutThreadPool);
507596
}
508597
if (batchDeleteThreadPool != null) {
509-
batchDeleteThreadPool.shutdownNow();
598+
executorServiceList.add(batchDeleteThreadPool);
510599
}
511600
if (batchScanThreadPool != null) {
512-
batchScanThreadPool.shutdownNow();
601+
executorServiceList.add(batchScanThreadPool);
513602
}
514603
if (deleteRangeThreadPool != null) {
515-
deleteRangeThreadPool.shutdownNow();
604+
executorServiceList.add(deleteRangeThreadPool);
516605
}
517-
if (client != null) {
518-
getPDClient().close();
606+
return executorServiceList;
607+
}
608+
609+
private void shutdownExecutorServices() {
610+
for (ExecutorService executorService : getExecutorServices()) {
611+
executorService.shutdown();
519612
}
520-
if (catalog != null) {
521-
getCatalog().close();
613+
}
614+
615+
private void shutdownNowExecutorServices() {
616+
for (ExecutorService executorService : getExecutorServices()) {
617+
executorService.shutdownNow();
618+
}
619+
}
620+
621+
private boolean isTerminatedExecutorServices() {
622+
for (ExecutorService executorService : getExecutorServices()) {
623+
if (!executorService.isTerminated()) {
624+
return false;
625+
}
522626
}
627+
return true;
523628
}
524629
}

0 commit comments

Comments
 (0)