Skip to content

Commit 6cbf56a

Browse files
authored
[to #556] metrics: attach cluster label to metrics (#558)
1 parent f4e7c30 commit 6cbf56a

31 files changed

+470
-237
lines changed

metrics/grafana/client_java_summary.json

Lines changed: 67 additions & 36 deletions
Large diffs are not rendered by default.

src/main/java/org/tikv/common/AbstractGRPCClient.java

Lines changed: 42 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -82,18 +82,16 @@ public <ReqT, RespT> RespT callWithRetry(
8282
if (logger.isTraceEnabled()) {
8383
logger.trace(String.format("Calling %s...", method.getFullMethodName()));
8484
}
85-
RetryPolicy.Builder<RespT> builder = new Builder<>(backOffer);
85+
RetryPolicy<RespT> policy = new Builder<RespT>(backOffer).create(handler);
8686
RespT resp =
87-
builder
88-
.create(handler)
89-
.callWithRetry(
90-
() -> {
91-
BlockingStubT stub = getBlockingStub();
92-
return ClientCalls.blockingUnaryCall(
93-
stub.getChannel(), method, stub.getCallOptions(), requestFactory.get());
94-
},
95-
method.getFullMethodName(),
96-
backOffer);
87+
policy.callWithRetry(
88+
() -> {
89+
BlockingStubT stub = getBlockingStub();
90+
return ClientCalls.blockingUnaryCall(
91+
stub.getChannel(), method, stub.getCallOptions(), requestFactory.get());
92+
},
93+
method.getFullMethodName(),
94+
backOffer);
9795

9896
if (logger.isTraceEnabled()) {
9997
logger.trace(String.format("leaving %s...", method.getFullMethodName()));
@@ -109,20 +107,18 @@ protected <ReqT, RespT> void callAsyncWithRetry(
109107
ErrorHandler<RespT> handler) {
110108
logger.debug(String.format("Calling %s...", method.getFullMethodName()));
111109

112-
RetryPolicy.Builder<RespT> builder = new Builder<>(backOffer);
113-
builder
114-
.create(handler)
115-
.callWithRetry(
116-
() -> {
117-
FutureStubT stub = getAsyncStub();
118-
ClientCalls.asyncUnaryCall(
119-
stub.getChannel().newCall(method, stub.getCallOptions()),
120-
requestFactory.get(),
121-
responseObserver);
122-
return null;
123-
},
124-
method.getFullMethodName(),
125-
backOffer);
110+
RetryPolicy<RespT> policy = new Builder<RespT>(backOffer).create(handler);
111+
policy.callWithRetry(
112+
() -> {
113+
FutureStubT stub = getAsyncStub();
114+
ClientCalls.asyncUnaryCall(
115+
stub.getChannel().newCall(method, stub.getCallOptions()),
116+
requestFactory.get(),
117+
responseObserver);
118+
return null;
119+
},
120+
method.getFullMethodName(),
121+
backOffer);
126122
logger.debug(String.format("leaving %s...", method.getFullMethodName()));
127123
}
128124

@@ -133,18 +129,17 @@ <ReqT, RespT> StreamObserver<ReqT> callBidiStreamingWithRetry(
133129
ErrorHandler<StreamObserver<ReqT>> handler) {
134130
logger.debug(String.format("Calling %s...", method.getFullMethodName()));
135131

136-
RetryPolicy.Builder<StreamObserver<ReqT>> builder = new Builder<>(backOffer);
132+
RetryPolicy<StreamObserver<ReqT>> policy =
133+
new Builder<StreamObserver<ReqT>>(backOffer).create(handler);
137134
StreamObserver<ReqT> observer =
138-
builder
139-
.create(handler)
140-
.callWithRetry(
141-
() -> {
142-
FutureStubT stub = getAsyncStub();
143-
return asyncBidiStreamingCall(
144-
stub.getChannel().newCall(method, stub.getCallOptions()), responseObserver);
145-
},
146-
method.getFullMethodName(),
147-
backOffer);
135+
policy.callWithRetry(
136+
() -> {
137+
FutureStubT stub = getAsyncStub();
138+
return asyncBidiStreamingCall(
139+
stub.getChannel().newCall(method, stub.getCallOptions()), responseObserver);
140+
},
141+
method.getFullMethodName(),
142+
backOffer);
148143
logger.debug(String.format("leaving %s...", method.getFullMethodName()));
149144
return observer;
150145
}
@@ -156,19 +151,18 @@ public <ReqT, RespT> StreamingResponse callServerStreamingWithRetry(
156151
ErrorHandler<StreamingResponse> handler) {
157152
logger.debug(String.format("Calling %s...", method.getFullMethodName()));
158153

159-
RetryPolicy.Builder<StreamingResponse> builder = new Builder<>(backOffer);
154+
RetryPolicy<StreamingResponse> policy =
155+
new Builder<StreamingResponse>(backOffer).create(handler);
160156
StreamingResponse response =
161-
builder
162-
.create(handler)
163-
.callWithRetry(
164-
() -> {
165-
BlockingStubT stub = getBlockingStub();
166-
return new StreamingResponse(
167-
blockingServerStreamingCall(
168-
stub.getChannel(), method, stub.getCallOptions(), requestFactory.get()));
169-
},
170-
method.getFullMethodName(),
171-
backOffer);
157+
policy.callWithRetry(
158+
() -> {
159+
BlockingStubT stub = getBlockingStub();
160+
return new StreamingResponse(
161+
blockingServerStreamingCall(
162+
stub.getChannel(), method, stub.getCallOptions(), requestFactory.get()));
163+
},
164+
method.getFullMethodName(),
165+
backOffer);
172166
logger.debug(String.format("leaving %s...", method.getFullMethodName()));
173167
return response;
174168
}

src/main/java/org/tikv/common/KVClient.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,9 @@ public void close() {}
6565
* @return a ByteString value if key exists, ByteString.EMPTY if key does not exist
6666
*/
6767
public ByteString get(ByteString key, long version) throws GrpcException {
68-
BackOffer backOffer = ConcreteBackOffer.newGetBackOff();
68+
BackOffer backOffer =
69+
ConcreteBackOffer.newGetBackOff(
70+
clientBuilder.getRegionManager().getPDClient().getClusterId());
6971
while (true) {
7072
RegionStoreClient client = clientBuilder.build(key);
7173
try {

src/main/java/org/tikv/common/PDClient.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
127127
HistogramUtils.buildDuration()
128128
.name("client_java_pd_get_region_by_requests_latency")
129129
.help("pd getRegionByKey request latency.")
130+
.labelNames("cluster")
130131
.register();
131132

132133
private PDClient(TiConfiguration conf, ChannelFactory channelFactory) {
@@ -281,7 +282,7 @@ private GetOperatorResponse getOperator(long regionId) {
281282
() -> GetOperatorRequest.newBuilder().setHeader(header).setRegionId(regionId).build();
282283
// get operator no need to handle error and no need back offer.
283284
return callWithRetry(
284-
ConcreteBackOffer.newCustomBackOff(0),
285+
ConcreteBackOffer.newCustomBackOff(0, getClusterId()),
285286
PDGrpc.getGetOperatorMethod(),
286287
request,
287288
new NoopHandler<>());
@@ -309,7 +310,8 @@ private boolean isScatterRegionFinish(GetOperatorResponse resp) {
309310

310311
@Override
311312
public Pair<Metapb.Region, Metapb.Peer> getRegionByKey(BackOffer backOffer, ByteString key) {
312-
Histogram.Timer requestTimer = PD_GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer();
313+
Histogram.Timer requestTimer =
314+
PD_GET_REGION_BY_KEY_REQUEST_LATENCY.labels(getClusterId().toString()).startTimer();
313315
try {
314316
if (conf.isTxnKVMode()) {
315317
CodecDataOutput cdo = new CodecDataOutput();
@@ -841,7 +843,7 @@ private Metapb.Region decodeRegion(Metapb.Region region) {
841843
return builder.build();
842844
}
843845

844-
public long getClusterId() {
846+
public Long getClusterId() {
845847
return header.getClusterId();
846848
}
847849

src/main/java/org/tikv/common/ReadOnlyPDClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,4 +67,6 @@ List<Pdpb.Region> scanRegions(
6767
List<Store> getAllStores(BackOffer backOffer);
6868

6969
TiConfiguration.ReplicaRead getReplicaRead();
70+
71+
Long getClusterId();
7072
}

src/main/java/org/tikv/common/Snapshot.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,9 @@ public List<org.tikv.common.BytePairWrapper> batchGet(int backOffer, List<byte[]
8080
try (KVClient client = new KVClient(session, session.getRegionStoreClientBuilder())) {
8181
List<KvPair> kvPairList =
8282
client.batchGet(
83-
ConcreteBackOffer.newCustomBackOff(backOffer), list, timestamp.getVersion());
83+
ConcreteBackOffer.newCustomBackOff(backOffer, session.getPDClient().getClusterId()),
84+
list,
85+
timestamp.getVersion());
8486
return kvPairList
8587
.stream()
8688
.map(

src/main/java/org/tikv/common/StoreVersion.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ public static int compareTo(String v0, String v1) {
6262
public static boolean minTiKVVersion(String version, PDClient pdClient) {
6363
StoreVersion storeVersion = new StoreVersion(version);
6464

65-
BackOffer bo = ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF);
65+
BackOffer bo =
66+
ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF, pdClient.getClusterId());
6667
List<Metapb.Store> storeList =
6768
pdClient
6869
.getAllStores(bo)

src/main/java/org/tikv/common/TiSession.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ public TiSession(TiConfiguration conf) {
158158
if (conf.isWarmUpEnable() && conf.isRawKVMode()) {
159159
warmUp();
160160
}
161-
this.circuitBreaker = new CircuitBreakerImpl(conf);
161+
this.circuitBreaker = new CircuitBreakerImpl(conf, client.getClusterId());
162162
logger.info("TiSession initialized in " + conf.getKvMode() + " mode");
163163
}
164164

@@ -179,7 +179,7 @@ private static VersionInfo getVersionInfo() {
179179

180180
private synchronized void warmUp() {
181181
long warmUpStartTime = System.nanoTime();
182-
BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff();
182+
BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff(getPDClient().getClusterId());
183183
try {
184184
// let JVM ClassLoader load gRPC error related classes
185185
// this operation may cost 100ms
@@ -329,7 +329,8 @@ public TiConfiguration getConf() {
329329
public TiTimestamp getTimestamp() {
330330
checkIsClosed();
331331

332-
return getPDClient().getTimestamp(ConcreteBackOffer.newTsoBackOff());
332+
return getPDClient()
333+
.getTimestamp(ConcreteBackOffer.newTsoBackOff(getPDClient().getClusterId()));
333334
}
334335

335336
public Snapshot createSnapshot() {
@@ -586,13 +587,16 @@ public void splitRegionAndScatter(
586587
.stream()
587588
.map(k -> Key.toRawKey(k).toByteString())
588589
.collect(Collectors.toList()),
589-
ConcreteBackOffer.newCustomBackOff(splitRegionBackoffMS));
590+
ConcreteBackOffer.newCustomBackOff(splitRegionBackoffMS, getPDClient().getClusterId()));
590591

591592
// scatter region
592593
for (Metapb.Region newRegion : newRegions) {
593594
try {
594595
getPDClient()
595-
.scatterRegion(newRegion, ConcreteBackOffer.newCustomBackOff(scatterRegionBackoffMS));
596+
.scatterRegion(
597+
newRegion,
598+
ConcreteBackOffer.newCustomBackOff(
599+
scatterRegionBackoffMS, getPDClient().getClusterId()));
596600
} catch (Exception e) {
597601
logger.warn(String.format("failed to scatter region: %d", newRegion.getId()), e);
598602
}
@@ -609,7 +613,9 @@ public void splitRegionAndScatter(
609613
return;
610614
}
611615
getPDClient()
612-
.waitScatterRegionFinish(newRegion, ConcreteBackOffer.newCustomBackOff((int) remainMS));
616+
.waitScatterRegionFinish(
617+
newRegion,
618+
ConcreteBackOffer.newCustomBackOff((int) remainMS, getPDClient().getClusterId()));
613619
}
614620
} else {
615621
logger.info("skip to wait scatter region finish");

src/main/java/org/tikv/common/importer/ImporterClient.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,9 @@ private void ingest() throws GrpcException {
259259
}
260260

261261
Object writeResponse = clientLeader.getWriteResponse();
262-
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(BackOffer.INGEST_BACKOFF);
262+
BackOffer backOffer =
263+
ConcreteBackOffer.newCustomBackOff(
264+
BackOffer.INGEST_BACKOFF, tiSession.getPDClient().getClusterId());
263265
ingestWithRetry(writeResponse, backOffer);
264266
}
265267

src/main/java/org/tikv/common/importer/SwitchTiKVModeClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ private void switchTiKVToImportMode() {
7373
}
7474

7575
private void doSwitchTiKVMode(ImportSstpb.SwitchMode mode) {
76-
BackOffer bo = ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF);
76+
BackOffer bo =
77+
ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF, pdClient.getClusterId());
7778
List<Metapb.Store> allStores = pdClient.getAllStores(bo);
7879
for (Metapb.Store store : allStores) {
7980
ImporterStoreClient client = builder.build(new TiStore(store));

0 commit comments

Comments
 (0)