Skip to content

Commit a33c1bb

Browse files
committed
cherry-pick #558
Signed-off-by: iosmanthus <myosmanthustree@gmail.com>
1 parent cf14239 commit a33c1bb

25 files changed

+421
-179
lines changed

metrics/grafana/client_java_summary.json

Lines changed: 164 additions & 36 deletions
Large diffs are not rendered by default.

src/main/java/org/tikv/common/KVClient.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,9 @@ public void close() {}
6565
* @return a ByteString value if key exists, ByteString.EMPTY if key does not exist
6666
*/
6767
public ByteString get(ByteString key, long version) throws GrpcException {
68-
BackOffer backOffer = ConcreteBackOffer.newGetBackOff();
68+
BackOffer backOffer =
69+
ConcreteBackOffer.newGetBackOff(
70+
clientBuilder.getRegionManager().getPDClient().getClusterId());
6971
while (true) {
7072
RegionStoreClient client = clientBuilder.build(key);
7173
try {

src/main/java/org/tikv/common/PDClient.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
113113
HistogramUtils.buildDuration()
114114
.name("client_java_pd_get_region_by_requests_latency")
115115
.help("pd getRegionByKey request latency.")
116+
.labelNames("cluster")
116117
.register();
117118

118119
private PDClient(TiConfiguration conf, ChannelFactory channelFactory) {
@@ -203,7 +204,7 @@ private GetOperatorResponse getOperator(long regionId) {
203204
() -> GetOperatorRequest.newBuilder().setHeader(header).setRegionId(regionId).build();
204205
// get operator no need to handle error and no need back offer.
205206
return callWithRetry(
206-
ConcreteBackOffer.newCustomBackOff(0),
207+
ConcreteBackOffer.newCustomBackOff(0, getClusterId()),
207208
PDGrpc.getGetOperatorMethod(),
208209
request,
209210
new NoopHandler<>());
@@ -231,7 +232,8 @@ private boolean isScatterRegionFinish(GetOperatorResponse resp) {
231232

232233
@Override
233234
public Pair<Metapb.Region, Metapb.Peer> getRegionByKey(BackOffer backOffer, ByteString key) {
234-
Histogram.Timer requestTimer = PD_GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer();
235+
Histogram.Timer requestTimer =
236+
PD_GET_REGION_BY_KEY_REQUEST_LATENCY.labels(getClusterId().toString()).startTimer();
235237
try {
236238
if (conf.getKvMode() == KVMode.TXN) {
237239
CodecDataOutput cdo = new CodecDataOutput();
@@ -745,7 +747,8 @@ private Metapb.Region decodeRegion(Metapb.Region region) {
745747
return builder.build();
746748
}
747749

748-
public long getClusterId() {
750+
@Override
751+
public Long getClusterId() {
749752
return header.getClusterId();
750753
}
751754

src/main/java/org/tikv/common/ReadOnlyPDClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,6 @@ List<Pdpb.Region> scanRegions(
6565
List<Store> getAllStores(BackOffer backOffer);
6666

6767
TiConfiguration.ReplicaRead getReplicaRead();
68+
69+
Long getClusterId();
6870
}

src/main/java/org/tikv/common/Snapshot.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,9 @@ public List<org.tikv.common.BytePairWrapper> batchGet(int backOffer, List<byte[]
7878
try (KVClient client = new KVClient(session, session.getRegionStoreClientBuilder())) {
7979
List<KvPair> kvPairList =
8080
client.batchGet(
81-
ConcreteBackOffer.newCustomBackOff(backOffer), list, timestamp.getVersion());
81+
ConcreteBackOffer.newCustomBackOff(backOffer, session.getPDClient().getClusterId()),
82+
list,
83+
timestamp.getVersion());
8284
return kvPairList
8385
.stream()
8486
.map(

src/main/java/org/tikv/common/StoreVersion.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ public static int compareTo(String v0, String v1) {
6060
public static boolean minTiKVVersion(String version, PDClient pdClient) {
6161
StoreVersion storeVersion = new StoreVersion(version);
6262

63-
BackOffer bo = ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF);
63+
BackOffer bo =
64+
ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF, pdClient.getClusterId());
6465
List<Metapb.Store> storeList =
6566
pdClient
6667
.getAllStores(bo)

src/main/java/org/tikv/common/TiSession.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,13 @@ public TiSession(TiConfiguration conf) {
9191
if (conf.isWarmUpEnable() && conf.isRawKVMode()) {
9292
warmUp();
9393
}
94-
this.circuitBreaker = new CircuitBreakerImpl(conf);
94+
this.circuitBreaker = new CircuitBreakerImpl(conf, getPDClient().getClusterId());
9595
logger.info("TiSession initialized in " + conf.getKvMode() + " mode");
9696
}
9797

9898
private synchronized void warmUp() {
9999
long warmUpStartTime = System.nanoTime();
100-
BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff();
100+
BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff(getPDClient().getClusterId());
101101

102102
try {
103103
// let JVM ClassLoader load gRPC error related classes
@@ -128,7 +128,9 @@ private synchronized void warmUp() {
128128
}
129129
for (Pdpb.Region region : regions) {
130130
regionManager.insertRegionToCache(
131-
regionManager.createRegion(region.getRegion(), ConcreteBackOffer.newGetBackOff()));
131+
regionManager.createRegion(
132+
region.getRegion(),
133+
ConcreteBackOffer.newGetBackOff(getPDClient().getClusterId())));
132134
}
133135
startKey = regions.get(regions.size() - 1).getRegion().getEndKey();
134136
} while (!startKey.isEmpty());
@@ -226,7 +228,8 @@ public TiConfiguration getConf() {
226228
public TiTimestamp getTimestamp() {
227229
checkIsClosed();
228230

229-
return getPDClient().getTimestamp(ConcreteBackOffer.newTsoBackOff());
231+
return getPDClient()
232+
.getTimestamp(ConcreteBackOffer.newTsoBackOff(getPDClient().getClusterId()));
230233
}
231234

232235
public Snapshot createSnapshot() {
@@ -459,7 +462,7 @@ public void splitRegionAndScatter(
459462
.stream()
460463
.map(k -> Key.toRawKey(k).next().toByteString())
461464
.collect(Collectors.toList()),
462-
ConcreteBackOffer.newCustomBackOff(splitRegionBackoffMS));
465+
ConcreteBackOffer.newCustomBackOff(splitRegionBackoffMS, getPDClient().getClusterId()));
463466

464467
// scatter region
465468
for (Metapb.Region newRegion : newRegions) {
@@ -482,7 +485,9 @@ public void splitRegionAndScatter(
482485
return;
483486
}
484487
getPDClient()
485-
.waitScatterRegionFinish(newRegion, ConcreteBackOffer.newCustomBackOff((int) remainMS));
488+
.waitScatterRegionFinish(
489+
newRegion,
490+
ConcreteBackOffer.newCustomBackOff((int) remainMS, getPDClient().getClusterId()));
486491
}
487492
} else {
488493
logger.info("skip to wait scatter region finish");

src/main/java/org/tikv/common/operation/iterator/ConcreteScanIterator.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,9 @@ TiRegion loadCurrentRegionToCache() throws GrpcException {
7474
try (RegionStoreClient client = builder.build(startKey)) {
7575
client.setTimeout(conf.getScanTimeout());
7676
region = client.getRegion();
77-
BackOffer backOffer = ConcreteBackOffer.newScannerNextMaxBackOff();
77+
BackOffer backOffer =
78+
ConcreteBackOffer.newScannerNextMaxBackOff(
79+
builder.getRegionManager().getPDClient().getClusterId());
7880
currentCache = client.scan(backOffer, startKey, version);
7981
return region;
8082
}
@@ -86,7 +88,8 @@ private ByteString resolveCurrentLock(Kvrpcpb.KvPair current) {
8688
builder.getRegionManager().getRegionStorePairByKey(current.getKey());
8789
TiRegion region = pair.first;
8890
TiStore store = pair.second;
89-
BackOffer backOffer = ConcreteBackOffer.newGetBackOff();
91+
BackOffer backOffer =
92+
ConcreteBackOffer.newGetBackOff(builder.getRegionManager().getPDClient().getClusterId());
9093
try (RegionStoreClient client = builder.build(region, store)) {
9194
return client.get(backOffer, current.getKey(), version);
9295
} catch (Exception e) {

src/main/java/org/tikv/common/policy/RetryPolicy.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,19 +33,19 @@ public abstract class RetryPolicy<RespT> {
3333
HistogramUtils.buildDuration()
3434
.name("client_java_grpc_single_requests_latency")
3535
.help("grpc request latency.")
36-
.labelNames("type")
36+
.labelNames("type", "cluster")
3737
.register();
3838
public static final Histogram CALL_WITH_RETRY_DURATION =
3939
HistogramUtils.buildDuration()
4040
.name("client_java_call_with_retry_duration")
4141
.help("callWithRetry duration.")
42-
.labelNames("type")
42+
.labelNames("type", "cluster")
4343
.register();
4444
public static final Counter GRPC_REQUEST_RETRY_NUM =
4545
Counter.build()
4646
.name("client_java_grpc_requests_retry_num")
4747
.help("grpc request retry num.")
48-
.labelNames("type")
48+
.labelNames("type", "cluster")
4949
.register();
5050

5151
// handles PD and TiKV's error.
@@ -70,16 +70,16 @@ private void rethrowNotRecoverableException(Exception e) {
7070
}
7171

7272
public RespT callWithRetry(Callable<RespT> proc, String methodName, BackOffer backOffer) {
73-
Histogram.Timer callWithRetryTimer = CALL_WITH_RETRY_DURATION.labels(methodName).startTimer();
73+
String[] labels = new String[] {methodName, backOffer.getClusterId().toString()};
74+
Histogram.Timer callWithRetryTimer = CALL_WITH_RETRY_DURATION.labels(labels).startTimer();
7475
SlowLogSpan callWithRetrySlowLogSpan =
7576
backOffer.getSlowLog().start("callWithRetry " + methodName);
7677
try {
7778
while (true) {
7879
RespT result = null;
7980
try {
8081
// add single request duration histogram
81-
Histogram.Timer requestTimer =
82-
GRPC_SINGLE_REQUEST_LATENCY.labels(methodName).startTimer();
82+
Histogram.Timer requestTimer = GRPC_SINGLE_REQUEST_LATENCY.labels(labels).startTimer();
8383
SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("gRPC " + methodName);
8484
try {
8585
result = proc.call();
@@ -93,7 +93,7 @@ public RespT callWithRetry(Callable<RespT> proc, String methodName, BackOffer ba
9393
backOffer.checkTimeout();
9494
boolean retry = handler.handleRequestError(backOffer, e);
9595
if (retry) {
96-
GRPC_REQUEST_RETRY_NUM.labels(methodName).inc();
96+
GRPC_REQUEST_RETRY_NUM.labels(labels).inc();
9797
continue;
9898
} else {
9999
return result;
@@ -104,7 +104,7 @@ public RespT callWithRetry(Callable<RespT> proc, String methodName, BackOffer ba
104104
if (handler != null) {
105105
boolean retry = handler.handleResponseError(backOffer, result);
106106
if (retry) {
107-
GRPC_REQUEST_RETRY_NUM.labels(methodName).inc();
107+
GRPC_REQUEST_RETRY_NUM.labels(labels).inc();
108108
continue;
109109
}
110110
}

src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,14 @@ public abstract class AbstractRegionStoreClient
5252
HistogramUtils.buildDuration()
5353
.name("client_java_seek_leader_store_duration")
5454
.help("seek leader store duration.")
55+
.labelNames("cluster")
5556
.register();
5657

5758
public static final Histogram SEEK_PROXY_STORE_DURATION =
5859
HistogramUtils.buildDuration()
5960
.name("client_java_seek_proxy_store_duration")
6061
.help("seek proxy store duration.")
62+
.labelNames("cluster")
6163
.register();
6264

6365
protected final RegionManager regionManager;
@@ -181,7 +183,10 @@ private void updateClientStub() {
181183
}
182184

183185
private Boolean seekLeaderStore(BackOffer backOffer) {
184-
Histogram.Timer switchLeaderDurationTimer = SEEK_LEADER_STORE_DURATION.startTimer();
186+
Histogram.Timer switchLeaderDurationTimer =
187+
SEEK_LEADER_STORE_DURATION
188+
.labels(regionManager.getPDClient().getClusterId().toString())
189+
.startTimer();
185190
SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("seekLeaderStore");
186191
try {
187192
List<Metapb.Peer> peers = region.getFollowerList();
@@ -229,7 +234,10 @@ private Boolean seekLeaderStore(BackOffer backOffer) {
229234

230235
private boolean seekProxyStore(BackOffer backOffer) {
231236
SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("seekProxyStore");
232-
Histogram.Timer grpcForwardDurationTimer = SEEK_PROXY_STORE_DURATION.startTimer();
237+
Histogram.Timer grpcForwardDurationTimer =
238+
SEEK_PROXY_STORE_DURATION
239+
.labels(regionManager.getPDClient().getClusterId().toString())
240+
.startTimer();
233241
try {
234242
logger.info(String.format("try grpc forward: region[%d]", region.getId()));
235243
// when current leader cannot be reached

0 commit comments

Comments
 (0)