diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index 72e5d756d5c..507db0228b4 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -27,7 +27,8 @@ public class ConfigUtils { public static final String TIKV_GRPC_SCAN_TIMEOUT = "tikv.grpc.scan_timeout_in_ms"; public static final String TIKV_GRPC_SCAN_BATCH_SIZE = "tikv.grpc.scan_batch_size"; public static final String TIKV_GRPC_MAX_FRAME_SIZE = "tikv.grpc.max_frame_size"; - + public static final String TIKV_GRPC_IDLE_TIMEOUT = "tikv.grpc.idle_timeout"; + public static final String TIKV_INDEX_SCAN_BATCH_SIZE = "tikv.index.scan_batch_size"; public static final String TIKV_INDEX_SCAN_CONCURRENCY = "tikv.index.scan_concurrency"; public static final String TIKV_TABLE_SCAN_CONCURRENCY = "tikv.table.scan_concurrency"; @@ -146,6 +147,8 @@ public class ConfigUtils { public static final String FOLLOWER = "FOLLOWER"; public static final String LEADER_AND_FOLLOWER = "LEADER_AND_FOLLOWER"; + public static final int DEF_TIKV_GRPC_IDLE_TIMEOUT = 60; + public static final boolean DEF_TiKV_CIRCUIT_BREAK_ENABLE = false; public static final int DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS = 60; public static final int DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_ERROR_THRESHOLD_PERCENTAGE = 100; diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index 760266d7500..6f55ec29b9a 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -112,6 +112,7 @@ private static void loadFromDefaultProperties() { setIfMissing(TIKV_GRPC_HEALTH_CHECK_TIMEOUT, DEF_CHECK_HEALTH_TIMEOUT); setIfMissing(TIKV_HEALTH_CHECK_PERIOD_DURATION, DEF_HEALTH_CHECK_PERIOD_DURATION); setIfMissing(TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS, DEF_TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS); + setIfMissing(TIKV_GRPC_IDLE_TIMEOUT, DEF_TIKV_GRPC_IDLE_TIMEOUT); setIfMissing(TIKV_RAWKV_READ_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_READ_TIMEOUT_IN_MS); setIfMissing(TIKV_RAWKV_WRITE_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_WRITE_TIMEOUT_IN_MS); setIfMissing(TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS); @@ -341,7 +342,7 @@ private static ReplicaRead getReplicaRead(String key) { private Optional rawKVBatchWriteSlowLogInMS = getIntOption(TIKV_RAWKV_BATCH_WRITE_SLOWLOG_IN_MS); private int rawKVScanSlowLogInMS = getInt(TIKV_RAWKV_SCAN_SLOWLOG_IN_MS); - + private int idleTimeout = getInt(TIKV_GRPC_IDLE_TIMEOUT); private boolean circuitBreakEnable = getBoolean(TiKV_CIRCUIT_BREAK_ENABLE); private int circuitBreakAvailabilityWindowInSeconds = getInt(TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS); @@ -666,6 +667,14 @@ public void setRawKVDefaultBackoffInMS(int rawKVDefaultBackoffInMS) { this.rawKVDefaultBackoffInMS = rawKVDefaultBackoffInMS; } + public int getIdleTimeout() { + return idleTimeout; + } + + public void setIdleTimeout(int timeout) { + this.idleTimeout = timeout; + } + public int getRawKVReadTimeoutInMS() { return rawKVReadTimeoutInMS; } diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index 3ca4d2b5a84..52960ce6852 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -77,7 +77,7 @@ public TiSession(TiConfiguration conf) { this.metricsServer = MetricsServer.getInstance(conf); this.conf = conf; - this.channelFactory = new ChannelFactory(conf.getMaxFrameSize()); + this.channelFactory = new ChannelFactory(conf.getMaxFrameSize(), conf.getIdleTimeout()); this.client = PDClient.createRaw(conf, channelFactory); this.enableGrpcForward = conf.getEnableGrpcForward(); if (this.enableGrpcForward) { diff --git a/src/main/java/org/tikv/common/util/ChannelFactory.java b/src/main/java/org/tikv/common/util/ChannelFactory.java index f6767d7e602..63da5be549d 100644 --- a/src/main/java/org/tikv/common/util/ChannelFactory.java +++ b/src/main/java/org/tikv/common/util/ChannelFactory.java @@ -25,10 +25,12 @@ public class ChannelFactory implements AutoCloseable { private final int maxFrameSize; + private final int idleTimeout; private final ConcurrentHashMap connPool = new ConcurrentHashMap<>(); - public ChannelFactory(int maxFrameSize) { + public ChannelFactory(int maxFrameSize, int idleTimeout) { this.maxFrameSize = maxFrameSize; + this.idleTimeout = idleTimeout; } public ManagedChannel getChannel(String addressStr, HostMapping hostMapping) { @@ -52,7 +54,7 @@ public ManagedChannel getChannel(String addressStr, HostMapping hostMapping) { return ManagedChannelBuilder.forAddress(mappedAddr.getHost(), mappedAddr.getPort()) .maxInboundMessageSize(maxFrameSize) .usePlaintext() - .idleTimeout(60, TimeUnit.SECONDS) + .idleTimeout(idleTimeout, TimeUnit.SECONDS) .build(); }); } diff --git a/src/test/java/org/tikv/common/TiConfigurationTest.java b/src/test/java/org/tikv/common/TiConfigurationTest.java index 64512ba1e9d..378c73ce3b3 100644 --- a/src/test/java/org/tikv/common/TiConfigurationTest.java +++ b/src/test/java/org/tikv/common/TiConfigurationTest.java @@ -26,4 +26,15 @@ public void configFileTest() { TiConfiguration conf = TiConfiguration.createRawDefault(); assertEquals("configFileTest", conf.getDBPrefix()); } + + @Test + public void testGrpcIdleTimeoutValue() { + TiConfiguration conf = TiConfiguration.createDefault(); + // default value + assertEquals(TiConfiguration.getInt(ConfigUtils.TIKV_GRPC_IDLE_TIMEOUT), conf.getIdleTimeout()); + // new value + int newValue = 100000; + conf.setIdleTimeout(newValue); + assertEquals(newValue, conf.getIdleTimeout()); + } }