Skip to content
8 changes: 8 additions & 0 deletions src/main/java/org/tikv/common/ConfigUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class ConfigUtils {
public static final String TIKV_GRPC_TIMEOUT = "tikv.grpc.timeout_in_ms";
public static final String TIKV_GRPC_INGEST_TIMEOUT = "tikv.grpc.ingest_timeout_in_ms";
public static final String TIKV_GRPC_FORWARD_TIMEOUT = "tikv.grpc.forward_timeout_in_ms";
public static final String TIKV_GRPC_WARM_UP_TIMEOUT = "tikv.grpc.warm_up_timeout_in_ms";
public static final String TIKV_PD_FIRST_GET_MEMBER_TIMEOUT =
"tikv.grpc.pd_first_get_member_timeout_in_ms";
public static final String TIKV_GRPC_SCAN_TIMEOUT = "tikv.grpc.scan_timeout_in_ms";
Expand Down Expand Up @@ -111,11 +112,15 @@ public class ConfigUtils {
public static final String TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT =
"tikv.circuit_break.trigger.attempt_request_count";

public static final String TIKV_SCAN_REGIONS_LIMIT = "tikv.scan_regions_limit";

public static final String TIFLASH_ENABLE = "tiflash.enable";
public static final String TIKV_WARM_UP_ENABLE = "tikv.warm_up.enable";
public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379";
public static final String DEF_TIMEOUT = "200ms";
public static final String DEF_TIKV_GRPC_INGEST_TIMEOUT = "200s";
public static final String DEF_FORWARD_TIMEOUT = "300ms";
public static final String DEF_TIKV_GRPC_WARM_UP_TIMEOUT = "5000ms";
public static final String DEF_TIKV_PD_FIRST_GET_MEMBER_TIMEOUT = "10000ms";
public static final String DEF_SCAN_TIMEOUT = "20s";
public static final int DEF_CHECK_HEALTH_TIMEOUT = 100;
Expand Down Expand Up @@ -182,11 +187,14 @@ public class ConfigUtils {
public static final boolean DEF_TIKV_TLS_ENABLE = false;
public static final boolean DEF_TIKV_USE_JKS = false;
public static final boolean DEF_TIFLASH_ENABLE = false;
public static final boolean DEF_TIKV_WARM_UP_ENABLE = true;

public static final boolean DEF_TiKV_CIRCUIT_BREAK_ENABLE = false;
public static final int DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS = 60;
public static final int DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_ERROR_THRESHOLD_PERCENTAGE = 100;
public static final int DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUST_VOLUMN_THRESHOLD = 10;
public static final int DEF_TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS = 20;
public static final int DEF_TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT = 10;

public static final int DEF_TIKV_SCAN_REGIONS_LIMIT = 1000;
}
21 changes: 21 additions & 0 deletions src/main/java/org/tikv/common/PDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,27 @@ public Pair<Metapb.Region, Metapb.Peer> getRegionByID(BackOffer backOffer, long
return new Pair<Metapb.Region, Metapb.Peer>(decodeRegion(resp.getRegion()), resp.getLeader());
}

@Override
public List<Pdpb.Region> scanRegions(
BackOffer backOffer, ByteString startKey, ByteString endKey, int limit) {
// no need to backoff because ScanRegions is just for optimization
// introduce a warm-up timeout for ScanRegions requests
PDGrpc.PDBlockingStub stub =
getBlockingStub().withDeadlineAfter(conf.getWarmUpTimeout(), TimeUnit.MILLISECONDS);
Pdpb.ScanRegionsRequest request =
Pdpb.ScanRegionsRequest.newBuilder()
.setHeader(header)
.setStartKey(startKey)
.setEndKey(endKey)
.setLimit(limit)
.build();
Pdpb.ScanRegionsResponse resp = stub.scanRegions(request);
if (resp == null) {
return null;
}
return resp.getRegionsList();
}

private Supplier<GetStoreRequest> buildGetStoreReq(long storeId) {
return () -> GetStoreRequest.newBuilder().setHeader(header).setStoreId(storeId).build();
}
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/tikv/common/ReadOnlyPDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Metapb.Store;
import org.tikv.kvproto.Pdpb;

/** Readonly PD client including only reading related interface Supposed for TiDB-like use cases */
public interface ReadOnlyPDClient {
Expand All @@ -48,6 +49,9 @@ public interface ReadOnlyPDClient {
*/
Pair<Metapb.Region, Metapb.Peer> getRegionByID(BackOffer backOffer, long id);

List<Pdpb.Region> scanRegions(
BackOffer backOffer, ByteString startKey, ByteString endKey, int limit);

HostMapping getHostMapping();

/**
Expand Down
32 changes: 32 additions & 0 deletions src/main/java/org/tikv/common/TiConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ private static void loadFromDefaultProperties() {
setIfMissing(TIKV_GRPC_TIMEOUT, DEF_TIMEOUT);
setIfMissing(TIKV_GRPC_INGEST_TIMEOUT, DEF_TIKV_GRPC_INGEST_TIMEOUT);
setIfMissing(TIKV_GRPC_FORWARD_TIMEOUT, DEF_FORWARD_TIMEOUT);
setIfMissing(TIKV_GRPC_WARM_UP_TIMEOUT, DEF_TIKV_GRPC_WARM_UP_TIMEOUT);
setIfMissing(TIKV_PD_FIRST_GET_MEMBER_TIMEOUT, DEF_TIKV_PD_FIRST_GET_MEMBER_TIMEOUT);
setIfMissing(TIKV_GRPC_SCAN_TIMEOUT, DEF_SCAN_TIMEOUT);
setIfMissing(TIKV_GRPC_SCAN_BATCH_SIZE, DEF_SCAN_BATCH_SIZE);
Expand Down Expand Up @@ -124,6 +125,7 @@ private static void loadFromDefaultProperties() {
setIfMissing(TIKV_TLS_ENABLE, DEF_TIKV_TLS_ENABLE);
setIfMissing(TIKV_USE_JKS, DEF_TIKV_USE_JKS);
setIfMissing(TIFLASH_ENABLE, DEF_TIFLASH_ENABLE);
setIfMissing(TIKV_WARM_UP_ENABLE, DEF_TIKV_WARM_UP_ENABLE);
setIfMissing(TIKV_RAWKV_READ_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_READ_TIMEOUT_IN_MS);
setIfMissing(TIKV_RAWKV_WRITE_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_WRITE_TIMEOUT_IN_MS);
setIfMissing(TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS);
Expand All @@ -146,6 +148,7 @@ private static void loadFromDefaultProperties() {
TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS, DEF_TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS);
setIfMissing(
TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT, DEF_TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT);
setIfMissing(TIKV_SCAN_REGIONS_LIMIT, DEF_TIKV_SCAN_REGIONS_LIMIT);
}

public static void listAll() {
Expand Down Expand Up @@ -309,6 +312,7 @@ private static ReplicaRead getReplicaRead(String key) {
private long timeout = getTimeAsMs(TIKV_GRPC_TIMEOUT);
private long ingestTimeout = getTimeAsMs(TIKV_GRPC_INGEST_TIMEOUT);
private long forwardTimeout = getTimeAsMs(TIKV_GRPC_FORWARD_TIMEOUT);
private long warmUpTimeout = getTimeAsMs(TIKV_GRPC_WARM_UP_TIMEOUT);
private long pdFirstGetMemberTimeout = getTimeAsMs(TIKV_PD_FIRST_GET_MEMBER_TIMEOUT);
private long scanTimeout = getTimeAsMs(TIKV_GRPC_SCAN_TIMEOUT);
private int maxFrameSize = getInt(TIKV_GRPC_MAX_FRAME_SIZE);
Expand Down Expand Up @@ -376,6 +380,7 @@ private static ReplicaRead getReplicaRead(String key) {
private String jksTrustPassword = getOption(TIKV_JKS_TRUST_PASSWORD).orElse(null);

private boolean tiFlashEnable = getBoolean(TIFLASH_ENABLE);
private boolean warmUpEnable = getBoolean(TIKV_WARM_UP_ENABLE);

private boolean isTest = false;

Expand All @@ -393,6 +398,8 @@ private static ReplicaRead getReplicaRead(String key) {
private int circuitBreakSleepWindowInSeconds = getInt(TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS);
private int circuitBreakAttemptRequestCount = getInt(TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT);

private int scanRegionsLimit = getInt(TIKV_SCAN_REGIONS_LIMIT);

public enum KVMode {
TXN,
RAW
Expand Down Expand Up @@ -475,6 +482,15 @@ public TiConfiguration setForwardTimeout(long timeout) {
return this;
}

public long getWarmUpTimeout() {
return warmUpTimeout;
}

public TiConfiguration setWarmUpTimeout(long timeout) {
this.warmUpTimeout = timeout;
return this;
}

public long getPdFirstGetMemberTimeout() {
return pdFirstGetMemberTimeout;
}
Expand Down Expand Up @@ -811,6 +827,14 @@ public boolean isTiFlashEnabled() {
return tiFlashEnable;
}

public boolean isWarmUpEnable() {
return warmUpEnable;
}

public void setWarmUpEnable(boolean warmUpEnable) {
this.warmUpEnable = warmUpEnable;
}

public boolean isTlsEnable() {
return tlsEnable;
}
Expand Down Expand Up @@ -1023,4 +1047,12 @@ public int getCircuitBreakAttemptRequestCount() {
public void setCircuitBreakAttemptRequestCount(int circuitBreakAttemptRequestCount) {
this.circuitBreakAttemptRequestCount = circuitBreakAttemptRequestCount;
}

public int getScanRegionsLimit() {
return scanRegionsLimit;
}

public void setScanRegionsLimit(int scanRegionsLimit) {
this.scanRegionsLimit = scanRegionsLimit;
}
}
48 changes: 33 additions & 15 deletions src/main/java/org/tikv/common/TiSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.tikv.common.util.*;
import org.tikv.kvproto.ImportSstpb;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Pdpb;
import org.tikv.raw.RawKVClient;
import org.tikv.raw.SmartRawKVClient;
import org.tikv.service.failsafe.CircuitBreaker;
Expand Down Expand Up @@ -137,7 +138,9 @@ public TiSession(TiConfiguration conf) {
if (this.enableGrpcForward) {
logger.info("enable grpc forward for high available");
}
warmUp();
if (conf.isWarmUpEnable() && conf.isRawKVMode()) {
warmUp();
}
this.circuitBreaker = new CircuitBreakerImpl(conf);
logger.info("TiSession initialized in " + conf.getKvMode() + " mode");
}
Expand Down Expand Up @@ -169,24 +172,39 @@ private synchronized void warmUp() {
null,
new TiStore(this.client.getStore(ConcreteBackOffer.newGetBackOff(), store.getId())));
}
ByteString startKey = ByteString.EMPTY;

// use scan region to load region cache with limit
ByteString startKey = ByteString.EMPTY;
do {
TiRegion region = regionManager.getRegionByKey(startKey);
startKey = region.getEndKey();
List<Pdpb.Region> regions =
regionManager.scanRegions(
ConcreteBackOffer.newGetBackOff(),
startKey,
ByteString.EMPTY,
conf.getScanRegionsLimit());
if (regions == null || regions.isEmpty()) {
// something went wrong, but the warm-up process could continue
break;
}
for (Pdpb.Region region : regions) {
regionManager.insertRegionToCache(
regionManager.createRegion(region.getRegion(), ConcreteBackOffer.newGetBackOff()));
}
startKey = regions.get(regions.size() - 1).getRegion().getEndKey();
} while (!startKey.isEmpty());

RawKVClient rawKVClient = createRawClient();
ByteString exampleKey = ByteString.EMPTY;
Optional<ByteString> prev = rawKVClient.get(exampleKey);
if (prev.isPresent()) {
rawKVClient.delete(exampleKey);
rawKVClient.putIfAbsent(exampleKey, prev.get());
rawKVClient.put(exampleKey, prev.get());
} else {
rawKVClient.putIfAbsent(exampleKey, ByteString.EMPTY);
rawKVClient.put(exampleKey, ByteString.EMPTY);
rawKVClient.delete(exampleKey);
try (RawKVClient rawKVClient = createRawClient()) {
ByteString exampleKey = ByteString.EMPTY;
Optional<ByteString> prev = rawKVClient.get(exampleKey);
if (prev.isPresent()) {
rawKVClient.delete(exampleKey);
rawKVClient.putIfAbsent(exampleKey, prev.get());
rawKVClient.put(exampleKey, prev.get());
} else {
rawKVClient.putIfAbsent(exampleKey, ByteString.EMPTY);
rawKVClient.put(exampleKey, ByteString.EMPTY);
rawKVClient.delete(exampleKey);
}
}
} catch (Exception e) {
// ignore error
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/org/tikv/common/region/RegionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.protobuf.ByteString;
import io.prometheus.client.Histogram;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -41,6 +42,7 @@
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Metapb.Peer;
import org.tikv.kvproto.Metapb.StoreState;
import org.tikv.kvproto.Pdpb;

@SuppressWarnings("UnstableApiUsage")
public class RegionManager {
Expand All @@ -50,6 +52,11 @@ public class RegionManager {
.name("client_java_get_region_by_requests_latency")
.help("getRegionByKey request latency.")
.register();
public static final Histogram SCAN_REGIONS_REQUEST_LATENCY =
Histogram.build()
.name("client_java_scan_regions_request_latency")
.help("scanRegions request latency.")
.register();

// TODO: the region cache logic need rewrite.
// https://github.com/pingcap/tispark/issues/1170
Expand Down Expand Up @@ -95,6 +102,20 @@ public void invalidateAll() {
cache.invalidateAll();
}

public List<Pdpb.Region> scanRegions(
BackOffer backOffer, ByteString startKey, ByteString endKey, int limit) {
Histogram.Timer requestTimer = SCAN_REGIONS_REQUEST_LATENCY.startTimer();
SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("scanRegions");
try {
return pdClient.scanRegions(backOffer, startKey, endKey, limit);
} catch (Exception e) {
return new ArrayList<>();
} finally {
requestTimer.observeDuration();
slowLogSpan.end();
}
}

public TiRegion getRegionByKey(ByteString key) {
return getRegionByKey(key, defaultBackOff());
}
Expand Down
1 change: 1 addition & 0 deletions src/test/java/org/tikv/BaseRawKVTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ protected TiConfiguration createTiConfiguration() {
? TiConfiguration.createRawDefault()
: TiConfiguration.createRawDefault(pdAddrsStr);
conf.setTest(true);
conf.setEnableAtomicForCAS(true);
conf.setEnableGrpcForward(false);
return conf;
}
Expand Down
30 changes: 30 additions & 0 deletions src/test/java/org/tikv/common/TiSessionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@
import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.BaseRawKVTest;
import org.tikv.common.region.TiRegion;
import org.tikv.raw.RawKVClient;

public class TiSessionTest extends BaseRawKVTest {
private static final Logger logger = LoggerFactory.getLogger(TiSessionTest.class);
private TiSession session;

@After
Expand Down Expand Up @@ -122,4 +125,31 @@ private void doCloseTest(boolean now, long timeoutMS) throws Exception {
assertTrue(e.getMessage().contains("rejected from java.util.concurrent.ThreadPoolExecutor"));
}
}

@Test
public void warmUpTest() throws Exception {
TiConfiguration conf = createTiConfiguration();
conf.setWarmUpEnable(true);
long t0 = doTest(conf);
conf.setWarmUpEnable(false);
long t1 = doTest(conf);
assertTrue(t0 < t1);
}

private long doTest(TiConfiguration conf) throws Exception {
session = TiSession.create(conf);
long start = System.currentTimeMillis();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it better to use System.nanoTime()?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is not necessary in unit test.

try (RawKVClient client = session.createRawClient()) {
client.get(ByteString.EMPTY);
}
long end = System.currentTimeMillis();
logger.info(
"[warm up "
+ (conf.isWarmUpEnable() ? "enabled" : "disabled")
+ "] duration "
+ (end - start)
+ "ms");
session.close();
return end - start;
}
}