Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
1c97d49
api-v2: enable api-v2
iosmanthus Mar 24, 2022
0d485f2
Merge branch 'master' of github.com:tikv/client-java into api-v2
iosmanthus Mar 25, 2022
f45d583
attach api version to context and fix lock resolver api version
iosmanthus Mar 25, 2022
8ff27f2
./dev/javafmt
iosmanthus Mar 28, 2022
7bb666b
fix api v2 config test
iosmanthus Mar 28, 2022
790ffd2
add topb test
iosmanthus Mar 28, 2022
c5e502e
fix ingest interface for rawkv
iosmanthus Apr 6, 2022
8a11a88
fix ingest interface for rawkv
iosmanthus Apr 12, 2022
ec2d127
Merge branch 'master' of github.com:tikv/client-java into api-v2
iosmanthus Apr 15, 2022
9b0363c
Merge branch 'master' of github.com:tikv/client-java into api-v2
iosmanthus Apr 19, 2022
f9f4cfe
add api version test
iosmanthus Apr 19, 2022
e5ff7e0
add license header for new files
iosmanthus Apr 19, 2022
3e63dc4
fix ApiVersionTest for older version of TiKV
iosmanthus Apr 20, 2022
e44dee7
Merge branch 'api-v2' of github.com:iosmanthus/client-java; branch 'm…
iosmanthus May 12, 2022
04fa138
fix kvproto wrong commit
iosmanthus May 17, 2022
66d07c4
wip: add mock test for error handler for EpochNotMatch region error i…
iosmanthus May 23, 2022
153ec6f
Merge branch 'master' of github.com:tikv/client-java into api-v2
iosmanthus May 24, 2022
6771d13
add mock test for EpochNotMatch in API v2
iosmanthus May 24, 2022
b7c54f8
fix license header
iosmanthus May 24, 2022
72eb371
add some comments for epoch not match mock test
iosmanthus May 24, 2022
8443588
git fire!: add tests for scanRegions
iosmanthus May 25, 2022
62879c6
add tests for scanRegions
iosmanthus May 26, 2022
edbb9f5
add mock test for ApiV2 PD client
iosmanthus May 26, 2022
8b22d77
revert src/main/java/org/tikv/common/operation/iterator/RawScanIterat…
iosmanthus May 26, 2022
34a81a0
refactor: introduce RequestKeyCodec to reduce if
iosmanthus May 30, 2022
a72a6cf
./dev/javafmt
iosmanthus May 30, 2022
de87cbb
remove codec in TiConfiguration
iosmanthus May 30, 2022
6d43080
remove decodeRegion in PDClient
iosmanthus May 30, 2022
ef71788
remove extra ;
iosmanthus May 30, 2022
d252888
Merge branch 'api-v2' of github.com:iosmanthus/client-java into api-v2
iosmanthus May 30, 2022
fec799c
add batch encode method for RequestKeyCodec
iosmanthus May 31, 2022
0a64f84
print stack trace for unstable tests
iosmanthus May 31, 2022
cedc40d
fix wrong base class for PDClientV2MockTest
iosmanthus May 31, 2022
afd5858
wip: add tests for RequestKeyCodec
iosmanthus May 31, 2022
7c02154
./dev/javafmt
iosmanthus May 31, 2022
0e0bd46
complete RequestKeyCodecTest
iosmanthus Jun 1, 2022
1db88aa
remove if in scanRegions
iosmanthus Jun 1, 2022
629d4fa
change Pair type to org.tikv.common.util.Pair
iosmanthus Jun 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/main/java/org/tikv/common/ConfigUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ public class ConfigUtils {

public static final String TIFLASH_ENABLE = "tiflash.enable";
public static final String TIKV_WARM_UP_ENABLE = "tikv.warm_up.enable";

public static final String TIKV_API_VERSION = "tikv.api_version";;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a redundant semicolon.


public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379";
public static final String DEF_TIMEOUT = "200ms";
public static final String DEF_TIKV_GRPC_INGEST_TIMEOUT = "200s";
Expand Down Expand Up @@ -200,4 +203,6 @@ public class ConfigUtils {
public static final int DEF_TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT = 10;

public static final int DEF_TIKV_SCAN_REGIONS_LIMIT = 1000;

public static final int DEF_TIKV_API_VERSION = 1;
Comment thread
sunxiaoguang marked this conversation as resolved.
}
32 changes: 14 additions & 18 deletions src/main/java/org/tikv/common/PDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ private boolean isScatterRegionFinish(GetOperatorResponse resp) {
public Pair<Metapb.Region, Metapb.Peer> getRegionByKey(BackOffer backOffer, ByteString key) {
Histogram.Timer requestTimer = PD_GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer();
try {
if (conf.isTxnKVMode()) {
if (conf.isTxnKVMode() || conf.getApiVersion().isV2()) {
CodecDataOutput cdo = new CodecDataOutput();
BytesCodec.writeBytes(cdo, key.toByteArray());
key = cdo.toByteString();
Expand All @@ -326,7 +326,7 @@ public Pair<Metapb.Region, Metapb.Peer> getRegionByKey(BackOffer backOffer, Byte

GetRegionResponse resp =
callWithRetry(backOffer, PDGrpc.getGetRegionMethod(), request, handler);
return new Pair<Metapb.Region, Metapb.Peer>(decodeRegion(resp.getRegion()), resp.getLeader());
return new Pair<>(decodeRegion(resp.getRegion()), resp.getLeader());
} finally {
requestTimer.observeDuration();
}
Expand Down Expand Up @@ -806,35 +806,31 @@ private Metapb.Region decodeRegion(Metapb.Region region) {
.setRegionEpoch(region.getRegionEpoch())
.addAllPeers(region.getPeersList());

if (region.getStartKey().isEmpty() || isRawRegion) {
if (conf.getApiVersion().isV1() && (region.getStartKey().isEmpty() || isRawRegion)) {
builder.setStartKey(region.getStartKey());
} else {
if (!conf.isTest()) {
try {
byte[] decodedStartKey = BytesCodec.readBytes(new CodecDataInput(region.getStartKey()));
builder.setStartKey(ByteString.copyFrom(decodedStartKey));
} else {
try {
byte[] decodedStartKey = BytesCodec.readBytes(new CodecDataInput(region.getStartKey()));
builder.setStartKey(ByteString.copyFrom(decodedStartKey));
} catch (Exception e) {
builder.setStartKey(region.getStartKey());
} catch (Exception e) {
if (!conf.isTest()) {
throw e;
}
builder.setStartKey(region.getStartKey());
}
}

if (region.getEndKey().isEmpty() || isRawRegion) {
if (conf.getApiVersion().isV1() && (region.getEndKey().isEmpty() || isRawRegion)) {
builder.setEndKey(region.getEndKey());
} else {
if (!conf.isTest()) {
try {
byte[] decodedEndKey = BytesCodec.readBytes(new CodecDataInput(region.getEndKey()));
builder.setEndKey(ByteString.copyFrom(decodedEndKey));
} else {
try {
byte[] decodedEndKey = BytesCodec.readBytes(new CodecDataInput(region.getEndKey()));
builder.setEndKey(ByteString.copyFrom(decodedEndKey));
} catch (Exception e) {
builder.setEndKey(region.getEndKey());
} catch (Exception e) {
if (!conf.isTest()) {
throw e;
}
builder.setEndKey(region.getEndKey());
}
}

Expand Down
183 changes: 181 additions & 2 deletions src/main/java/org/tikv/common/TiConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,155 @@

package org.tikv.common;

import static org.tikv.common.ConfigUtils.*;
import static org.tikv.common.ConfigUtils.DEF_BATCH_DELETE_CONCURRENCY;
import static org.tikv.common.ConfigUtils.DEF_BATCH_GET_CONCURRENCY;
import static org.tikv.common.ConfigUtils.DEF_BATCH_PUT_CONCURRENCY;
import static org.tikv.common.ConfigUtils.DEF_BATCH_SCAN_CONCURRENCY;
import static org.tikv.common.ConfigUtils.DEF_CHECK_HEALTH_TIMEOUT;
import static org.tikv.common.ConfigUtils.DEF_DB_PREFIX;
import static org.tikv.common.ConfigUtils.DEF_DELETE_RANGE_CONCURRENCY;
import static org.tikv.common.ConfigUtils.DEF_FORWARD_TIMEOUT;
import static org.tikv.common.ConfigUtils.DEF_GRPC_FORWARD_ENABLE;
import static org.tikv.common.ConfigUtils.DEF_HEALTH_CHECK_PERIOD_DURATION;
import static org.tikv.common.ConfigUtils.DEF_INDEX_SCAN_BATCH_SIZE;
import static org.tikv.common.ConfigUtils.DEF_INDEX_SCAN_CONCURRENCY;
import static org.tikv.common.ConfigUtils.DEF_KV_CLIENT_CONCURRENCY;
import static org.tikv.common.ConfigUtils.DEF_MAX_FRAME_SIZE;
import static org.tikv.common.ConfigUtils.DEF_METRICS_ENABLE;
import static org.tikv.common.ConfigUtils.DEF_METRICS_PORT;
import static org.tikv.common.ConfigUtils.DEF_PD_ADDRESSES;
import static org.tikv.common.ConfigUtils.DEF_REPLICA_READ;
import static org.tikv.common.ConfigUtils.DEF_SCAN_BATCH_SIZE;
import static org.tikv.common.ConfigUtils.DEF_SCAN_TIMEOUT;
import static org.tikv.common.ConfigUtils.DEF_SHOW_ROWID;
import static org.tikv.common.ConfigUtils.DEF_TABLE_SCAN_CONCURRENCY;
import static org.tikv.common.ConfigUtils.DEF_TIFLASH_ENABLE;
import static org.tikv.common.ConfigUtils.DEF_TIKV_API_VERSION;
import static org.tikv.common.ConfigUtils.DEF_TIKV_BO_REGION_MISS_BASE_IN_MS;
import static org.tikv.common.ConfigUtils.DEF_TIKV_ENABLE_ATOMIC_FOR_CAS;
import static org.tikv.common.ConfigUtils.DEF_TIKV_GRPC_IDLE_TIMEOUT;
import static org.tikv.common.ConfigUtils.DEF_TIKV_GRPC_INGEST_TIMEOUT;
import static org.tikv.common.ConfigUtils.DEF_TIKV_GRPC_KEEPALIVE_TIME;
import static org.tikv.common.ConfigUtils.DEF_TIKV_GRPC_KEEPALIVE_TIMEOUT;
import static org.tikv.common.ConfigUtils.DEF_TIKV_GRPC_WARM_UP_TIMEOUT;
import static org.tikv.common.ConfigUtils.DEF_TIKV_IMPORTER_MAX_KV_BATCH_BYTES;
import static org.tikv.common.ConfigUtils.DEF_TIKV_IMPORTER_MAX_KV_BATCH_SIZE;
import static org.tikv.common.ConfigUtils.DEF_TIKV_NETWORK_MAPPING_NAME;
import static org.tikv.common.ConfigUtils.DEF_TIKV_PD_FIRST_GET_MEMBER_TIMEOUT;
import static org.tikv.common.ConfigUtils.DEF_TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS;
import static org.tikv.common.ConfigUtils.DEF_TIKV_RAWKV_BATCH_WRITE_TIMEOUT_IN_MS;
import static org.tikv.common.ConfigUtils.DEF_TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS;
import static org.tikv.common.ConfigUtils.DEF_TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS;
import static org.tikv.common.ConfigUtils.DEF_TIKV_RAWKV_READ_TIMEOUT_IN_MS;
import static org.tikv.common.ConfigUtils.DEF_TIKV_RAWKV_SCAN_SLOWLOG_IN_MS;
import static org.tikv.common.ConfigUtils.DEF_TIKV_RAWKV_SCAN_TIMEOUT_IN_MS;
import static org.tikv.common.ConfigUtils.DEF_TIKV_RAWKV_WRITE_TIMEOUT_IN_MS;
import static org.tikv.common.ConfigUtils.DEF_TIKV_SCAN_REGIONS_LIMIT;
import static org.tikv.common.ConfigUtils.DEF_TIKV_SCATTER_WAIT_SECONDS;
import static org.tikv.common.ConfigUtils.DEF_TIKV_TLS_ENABLE;
import static org.tikv.common.ConfigUtils.DEF_TIKV_USE_JKS;
import static org.tikv.common.ConfigUtils.DEF_TIKV_WARM_UP_ENABLE;
import static org.tikv.common.ConfigUtils.DEF_TIMEOUT;
import static org.tikv.common.ConfigUtils.DEF_TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT;
import static org.tikv.common.ConfigUtils.DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_ERROR_THRESHOLD_PERCENTAGE;
import static org.tikv.common.ConfigUtils.DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUST_VOLUMN_THRESHOLD;
import static org.tikv.common.ConfigUtils.DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS;
import static org.tikv.common.ConfigUtils.DEF_TiKV_CIRCUIT_BREAK_ENABLE;
import static org.tikv.common.ConfigUtils.DEF_TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS;
import static org.tikv.common.ConfigUtils.FOLLOWER;
import static org.tikv.common.ConfigUtils.HIGH_COMMAND_PRIORITY;
import static org.tikv.common.ConfigUtils.LEADER_AND_FOLLOWER;
import static org.tikv.common.ConfigUtils.LOW_COMMAND_PRIORITY;
import static org.tikv.common.ConfigUtils.NORMAL_COMMAND_PRIORITY;
import static org.tikv.common.ConfigUtils.RAW_KV_MODE;
import static org.tikv.common.ConfigUtils.READ_COMMITTED_ISOLATION_LEVEL;
import static org.tikv.common.ConfigUtils.SNAPSHOT_ISOLATION_LEVEL;
import static org.tikv.common.ConfigUtils.TIFLASH_ENABLE;
import static org.tikv.common.ConfigUtils.TIKV_API_VERSION;
import static org.tikv.common.ConfigUtils.TIKV_BATCH_DELETE_CONCURRENCY;
import static org.tikv.common.ConfigUtils.TIKV_BATCH_GET_CONCURRENCY;
import static org.tikv.common.ConfigUtils.TIKV_BATCH_PUT_CONCURRENCY;
import static org.tikv.common.ConfigUtils.TIKV_BATCH_SCAN_CONCURRENCY;
import static org.tikv.common.ConfigUtils.TIKV_BO_REGION_MISS_BASE_IN_MS;
import static org.tikv.common.ConfigUtils.TIKV_DB_PREFIX;
import static org.tikv.common.ConfigUtils.TIKV_DELETE_RANGE_CONCURRENCY;
import static org.tikv.common.ConfigUtils.TIKV_ENABLE_ATOMIC_FOR_CAS;
import static org.tikv.common.ConfigUtils.TIKV_ENABLE_GRPC_FORWARD;
import static org.tikv.common.ConfigUtils.TIKV_GRPC_FORWARD_TIMEOUT;
import static org.tikv.common.ConfigUtils.TIKV_GRPC_HEALTH_CHECK_TIMEOUT;
import static org.tikv.common.ConfigUtils.TIKV_GRPC_IDLE_TIMEOUT;
import static org.tikv.common.ConfigUtils.TIKV_GRPC_INGEST_TIMEOUT;
import static org.tikv.common.ConfigUtils.TIKV_GRPC_KEEPALIVE_TIME;
import static org.tikv.common.ConfigUtils.TIKV_GRPC_KEEPALIVE_TIMEOUT;
import static org.tikv.common.ConfigUtils.TIKV_GRPC_MAX_FRAME_SIZE;
import static org.tikv.common.ConfigUtils.TIKV_GRPC_SCAN_BATCH_SIZE;
import static org.tikv.common.ConfigUtils.TIKV_GRPC_SCAN_TIMEOUT;
import static org.tikv.common.ConfigUtils.TIKV_GRPC_TIMEOUT;
import static org.tikv.common.ConfigUtils.TIKV_GRPC_WARM_UP_TIMEOUT;
import static org.tikv.common.ConfigUtils.TIKV_HEALTH_CHECK_PERIOD_DURATION;
import static org.tikv.common.ConfigUtils.TIKV_IMPORTER_MAX_KV_BATCH_BYTES;
import static org.tikv.common.ConfigUtils.TIKV_IMPORTER_MAX_KV_BATCH_SIZE;
import static org.tikv.common.ConfigUtils.TIKV_INDEX_SCAN_BATCH_SIZE;
import static org.tikv.common.ConfigUtils.TIKV_INDEX_SCAN_CONCURRENCY;
import static org.tikv.common.ConfigUtils.TIKV_JKS_KEY_PASSWORD;
import static org.tikv.common.ConfigUtils.TIKV_JKS_KEY_PATH;
import static org.tikv.common.ConfigUtils.TIKV_JKS_TRUST_PASSWORD;
import static org.tikv.common.ConfigUtils.TIKV_JKS_TRUST_PATH;
import static org.tikv.common.ConfigUtils.TIKV_KEY_CERT_CHAIN;
import static org.tikv.common.ConfigUtils.TIKV_KEY_FILE;
import static org.tikv.common.ConfigUtils.TIKV_KV_CLIENT_CONCURRENCY;
import static org.tikv.common.ConfigUtils.TIKV_KV_MODE;
import static org.tikv.common.ConfigUtils.TIKV_METRICS_ENABLE;
import static org.tikv.common.ConfigUtils.TIKV_METRICS_PORT;
import static org.tikv.common.ConfigUtils.TIKV_NETWORK_MAPPING_NAME;
import static org.tikv.common.ConfigUtils.TIKV_PD_ADDRESSES;
import static org.tikv.common.ConfigUtils.TIKV_PD_FIRST_GET_MEMBER_TIMEOUT;
import static org.tikv.common.ConfigUtils.TIKV_RAWKV_BATCH_READ_SLOWLOG_IN_MS;
import static org.tikv.common.ConfigUtils.TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS;
import static org.tikv.common.ConfigUtils.TIKV_RAWKV_BATCH_WRITE_SLOWLOG_IN_MS;
import static org.tikv.common.ConfigUtils.TIKV_RAWKV_BATCH_WRITE_TIMEOUT_IN_MS;
import static org.tikv.common.ConfigUtils.TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS;
import static org.tikv.common.ConfigUtils.TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS;
import static org.tikv.common.ConfigUtils.TIKV_RAWKV_READ_SLOWLOG_IN_MS;
import static org.tikv.common.ConfigUtils.TIKV_RAWKV_READ_TIMEOUT_IN_MS;
import static org.tikv.common.ConfigUtils.TIKV_RAWKV_SCAN_SLOWLOG_IN_MS;
import static org.tikv.common.ConfigUtils.TIKV_RAWKV_SCAN_TIMEOUT_IN_MS;
import static org.tikv.common.ConfigUtils.TIKV_RAWKV_SERVER_SLOWLOG_FACTOR;
import static org.tikv.common.ConfigUtils.TIKV_RAWKV_WRITE_SLOWLOG_IN_MS;
import static org.tikv.common.ConfigUtils.TIKV_RAWKV_WRITE_TIMEOUT_IN_MS;
import static org.tikv.common.ConfigUtils.TIKV_REPLICA_READ;
import static org.tikv.common.ConfigUtils.TIKV_REQUEST_COMMAND_PRIORITY;
import static org.tikv.common.ConfigUtils.TIKV_REQUEST_ISOLATION_LEVEL;
import static org.tikv.common.ConfigUtils.TIKV_SCAN_REGIONS_LIMIT;
import static org.tikv.common.ConfigUtils.TIKV_SCATTER_WAIT_SECONDS;
import static org.tikv.common.ConfigUtils.TIKV_SHOW_ROWID;
import static org.tikv.common.ConfigUtils.TIKV_TABLE_SCAN_CONCURRENCY;
import static org.tikv.common.ConfigUtils.TIKV_TLS_ENABLE;
import static org.tikv.common.ConfigUtils.TIKV_TRUST_CERT_COLLECTION;
import static org.tikv.common.ConfigUtils.TIKV_USE_JKS;
import static org.tikv.common.ConfigUtils.TIKV_WARM_UP_ENABLE;
import static org.tikv.common.ConfigUtils.TXN_KV_MODE;
import static org.tikv.common.ConfigUtils.TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT;
import static org.tikv.common.ConfigUtils.TiKV_CIRCUIT_BREAK_AVAILABILITY_ERROR_THRESHOLD_PERCENTAGE;
import static org.tikv.common.ConfigUtils.TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUEST_VOLUMN_THRESHOLD;
import static org.tikv.common.ConfigUtils.TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS;
import static org.tikv.common.ConfigUtils.TiKV_CIRCUIT_BREAK_ENABLE;
import static org.tikv.common.ConfigUtils.TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS;

import io.grpc.Metadata;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.net.URI;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -34,13 +175,38 @@
import org.tikv.kvproto.Kvrpcpb.IsolationLevel;

public class TiConfiguration implements Serializable {
public static enum ApiVersion {
V1,
V2;

public static ApiVersion fromInt(int version) {
switch (version) {
case 1:
return V1;
case 2:
return V2;
default:
throw new IllegalArgumentException("unknown api version " + version);
}
}

public boolean isV1() {
return this == V1;
}

public boolean isV2() {
return this == V2;
}
}

private static final Logger logger = LoggerFactory.getLogger(TiConfiguration.class);
private static final ConcurrentHashMap<String, String> settings = new ConcurrentHashMap<>();
public static final Metadata.Key<String> FORWARD_META_DATA_KEY =
Metadata.Key.of("tikv-forwarded-host", Metadata.ASCII_STRING_MARSHALLER);
public static final Metadata.Key<String> PD_FORWARD_META_DATA_KEY =
Metadata.Key.of("pd-forwarded-host", Metadata.ASCII_STRING_MARSHALLER);
public static final String API_V2_RAW_PREFIX = "r";
public static final String API_V2_TXN_PREFIX = "x";

static {
// priority: system environment > config file > default
Expand Down Expand Up @@ -151,6 +317,8 @@ private static void loadFromDefaultProperties() {
setIfMissing(
TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT, DEF_TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT);
setIfMissing(TIKV_SCAN_REGIONS_LIMIT, DEF_TIKV_SCAN_REGIONS_LIMIT);

setIfMissing(TIKV_API_VERSION, DEF_TIKV_API_VERSION);
}

public static void listAll() {
Expand Down Expand Up @@ -403,6 +571,8 @@ private static ReplicaRead getReplicaRead(String key) {

private int scanRegionsLimit = getInt(TIKV_SCAN_REGIONS_LIMIT);

private ApiVersion apiVersion = ApiVersion.fromInt(getInt(TIKV_API_VERSION));

public enum KVMode {
TXN,
RAW
Expand Down Expand Up @@ -1075,4 +1245,13 @@ public int getScanRegionsLimit() {
public void setScanRegionsLimit(int scanRegionsLimit) {
this.scanRegionsLimit = scanRegionsLimit;
}

public ApiVersion getApiVersion() {
return apiVersion;
}

public TiConfiguration setApiVersion(ApiVersion version) {
this.apiVersion = version;
return this;
}
}
5 changes: 2 additions & 3 deletions src/main/java/org/tikv/common/TiSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
* contention
*/
public class TiSession implements AutoCloseable {

private static final Logger logger = LoggerFactory.getLogger(TiSession.class);
private static final Map<String, TiSession> sessionCachedMap = new HashMap<>();
private final TiConfiguration conf;
Expand Down Expand Up @@ -156,7 +155,7 @@ public TiSession(TiConfiguration conf) {
logger.info("enable grpc forward for high available");
}
if (conf.isWarmUpEnable() && conf.isRawKVMode()) {
warmUp();
warmup();
Comment thread
iosmanthus marked this conversation as resolved.
Outdated
}
this.circuitBreaker = new CircuitBreakerImpl(conf);
logger.info("TiSession initialized in " + conf.getKvMode() + " mode");
Expand All @@ -177,7 +176,7 @@ private static VersionInfo getVersionInfo() {
return info;
}

private synchronized void warmUp() {
private synchronized void warmup() {
long warmUpStartTime = System.nanoTime();
BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.slf4j.LoggerFactory;
import org.tikv.common.AbstractGRPCClient;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiConfiguration.KVMode;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.log.SlowLog;
import org.tikv.common.log.SlowLogSpan;
Expand Down Expand Up @@ -155,6 +156,42 @@ public boolean onStoreUnreachable(BackOffer backOffer) {
return false;
}

public ByteString buildRequestKey(ByteString key) {
switch (conf.getApiVersion()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we get the cluster version and tikv configuration from PD? so that we don't need to add one more config in the java client and the java client can automatically adjust the encode/decode methods according to the cluster it is connected to.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By design, the client should not read the TiKV configuration from PD or themself. Because:

  1. TiKV configurations could be different from each other due to the restart process.
  2. TiDB use client-go, which should have the same behaviour as the client-java, however, TiDB would never use API v2 but use API v1 instead, due to they have a key prefix with m or t. Thus, client-go should not read the API version from the cluster components, instead, just set the version to 1.

case V1:
return key;
case V2:
if (conf.getKvMode() == KVMode.RAW) {
return ByteString.copyFromUtf8(TiConfiguration.API_V2_RAW_PREFIX).concat(key);
} else if (conf.getKvMode() == KVMode.TXN) {
return ByteString.copyFromUtf8(TiConfiguration.API_V2_TXN_PREFIX).concat(key);
}
default:
throw new IllegalArgumentException("unknown api version or kv mode");
}
}

public ByteString unwrapResponseKey(ByteString key) {
switch (conf.getApiVersion()) {
case V1:
return key;
case V2:
if (conf.getKvMode() == KVMode.RAW) {
if (!key.startsWith(ByteString.copyFromUtf8(TiConfiguration.API_V2_RAW_PREFIX))) {
throw new IllegalArgumentException("key corrupted, wrong prefix");
}
return key.substring(1);
} else if (conf.getKvMode() == KVMode.TXN) {
if (!key.startsWith(ByteString.copyFromUtf8(TiConfiguration.API_V2_TXN_PREFIX))) {
throw new IllegalArgumentException("key corrupted, wrong prefix");
}
return key.substring(1);
}
default:
throw new IllegalArgumentException("unknown api version or kv mode");
}
}

private Kvrpcpb.Context addTraceId(Kvrpcpb.Context context, SlowLog slowLog) {
if (slowLog.getThresholdMS() < 0) {
// disable tikv tracing
Expand Down
Loading