Skip to content

Commit b60afd0

Browse files
authored
Optimize grpc forward and switch leader logic (#324)
1 parent b1e0c93 commit b60afd0

16 files changed

+277
-246
lines changed

src/main/java/org/tikv/common/AbstractGRPCClient.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.grpc.health.v1.HealthCheckRequest;
2424
import io.grpc.health.v1.HealthCheckResponse;
2525
import io.grpc.health.v1.HealthGrpc;
26+
import io.grpc.stub.AbstractFutureStub;
2627
import io.grpc.stub.AbstractStub;
2728
import io.grpc.stub.ClientCalls;
2829
import io.grpc.stub.StreamObserver;
@@ -38,14 +39,15 @@
3839
import org.tikv.common.util.ChannelFactory;
3940

4041
public abstract class AbstractGRPCClient<
41-
BlockingStubT extends AbstractStub<BlockingStubT>, StubT extends AbstractStub<StubT>>
42+
BlockingStubT extends AbstractStub<BlockingStubT>,
43+
FutureStubT extends AbstractFutureStub<FutureStubT>>
4244
implements AutoCloseable {
4345
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
4446
protected final ChannelFactory channelFactory;
4547
protected TiConfiguration conf;
4648
protected long timeout;
4749
protected BlockingStubT blockingStub;
48-
protected StubT asyncStub;
50+
protected FutureStubT asyncStub;
4951

5052
protected AbstractGRPCClient(TiConfiguration conf, ChannelFactory channelFactory) {
5153
this.conf = conf;
@@ -57,7 +59,7 @@ protected AbstractGRPCClient(
5759
TiConfiguration conf,
5860
ChannelFactory channelFactory,
5961
BlockingStubT blockingStub,
60-
StubT asyncStub) {
62+
FutureStubT asyncStub) {
6163
this.conf = conf;
6264
this.timeout = conf.getTimeout();
6365
this.channelFactory = channelFactory;
@@ -109,7 +111,7 @@ protected <ReqT, RespT> void callAsyncWithRetry(
109111
.create(handler)
110112
.callWithRetry(
111113
() -> {
112-
StubT stub = getAsyncStub();
114+
FutureStubT stub = getAsyncStub();
113115
ClientCalls.asyncUnaryCall(
114116
stub.getChannel().newCall(method, stub.getCallOptions()),
115117
requestFactory.get(),
@@ -133,7 +135,7 @@ <ReqT, RespT> StreamObserver<ReqT> callBidiStreamingWithRetry(
133135
.create(handler)
134136
.callWithRetry(
135137
() -> {
136-
StubT stub = getAsyncStub();
138+
FutureStubT stub = getAsyncStub();
137139
return asyncBidiStreamingCall(
138140
stub.getChannel().newCall(method, stub.getCallOptions()), responseObserver);
139141
},
@@ -175,7 +177,7 @@ public long getTimeout() {
175177

176178
protected abstract BlockingStubT getBlockingStub();
177179

178-
protected abstract StubT getAsyncStub();
180+
protected abstract FutureStubT getAsyncStub();
179181

180182
protected boolean checkHealth(String addressStr, HostMapping hostMapping) {
181183
ManagedChannel channel = channelFactory.getChannel(addressStr, hostMapping);

src/main/java/org/tikv/common/PDClient.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
import org.tikv.kvproto.Metapb.Store;
7070
import org.tikv.kvproto.PDGrpc;
7171
import org.tikv.kvproto.PDGrpc.PDBlockingStub;
72-
import org.tikv.kvproto.PDGrpc.PDStub;
72+
import org.tikv.kvproto.PDGrpc.PDFutureStub;
7373
import org.tikv.kvproto.Pdpb;
7474
import org.tikv.kvproto.Pdpb.Error;
7575
import org.tikv.kvproto.Pdpb.ErrorType;
@@ -92,7 +92,7 @@
9292
import org.tikv.kvproto.Pdpb.TsoRequest;
9393
import org.tikv.kvproto.Pdpb.TsoResponse;
9494

95-
public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
95+
public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
9696
implements ReadOnlyPDClient {
9797
private static final String TIFLASH_TABLE_SYNC_PROGRESS_PATH = "/tiflash/table/sync";
9898
private static final long MIN_TRY_UPDATE_DURATION = 50;
@@ -550,7 +550,7 @@ protected PDBlockingStub getBlockingStub() {
550550
}
551551

552552
@Override
553-
protected PDStub getAsyncStub() {
553+
protected PDFutureStub getAsyncStub() {
554554
if (pdClientWrapper == null) {
555555
throw new GrpcException("PDClient may not be initialized");
556556
}
@@ -644,7 +644,7 @@ private void initCluster() {
644644
static class PDClientWrapper {
645645
private final String leaderInfo;
646646
private final PDBlockingStub blockingStub;
647-
private final PDStub asyncStub;
647+
private final PDFutureStub asyncStub;
648648
private final long createTime;
649649
private final String storeAddress;
650650

@@ -655,10 +655,10 @@ static class PDClientWrapper {
655655
header.put(TiConfiguration.PD_FORWARD_META_DATA_KEY, addrToUri(leaderInfo).toString());
656656
this.blockingStub =
657657
MetadataUtils.attachHeaders(PDGrpc.newBlockingStub(clientChannel), header);
658-
this.asyncStub = MetadataUtils.attachHeaders(PDGrpc.newStub(clientChannel), header);
658+
this.asyncStub = MetadataUtils.attachHeaders(PDGrpc.newFutureStub(clientChannel), header);
659659
} else {
660660
this.blockingStub = PDGrpc.newBlockingStub(clientChannel);
661-
this.asyncStub = PDGrpc.newStub(clientChannel);
661+
this.asyncStub = PDGrpc.newFutureStub(clientChannel);
662662
}
663663
this.leaderInfo = leaderInfo;
664664
this.storeAddress = storeAddress;
@@ -677,7 +677,7 @@ PDBlockingStub getBlockingStub() {
677677
return blockingStub;
678678
}
679679

680-
PDStub getAsyncStub() {
680+
PDFutureStub getAsyncStub() {
681681
return asyncStub;
682682
}
683683

src/main/java/org/tikv/common/TiConfiguration.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ public class TiConfiguration implements Serializable {
3535

3636
private static final Logger logger = LoggerFactory.getLogger(TiConfiguration.class);
3737
private static final ConcurrentHashMap<String, String> settings = new ConcurrentHashMap<>();
38-
public static final Metadata.Key FORWARD_META_DATA_KEY =
38+
public static final Metadata.Key<String> FORWARD_META_DATA_KEY =
3939
Metadata.Key.of("tikv-forwarded-host", Metadata.ASCII_STRING_MARSHALLER);
40-
public static final Metadata.Key PD_FORWARD_META_DATA_KEY =
40+
public static final Metadata.Key<String> PD_FORWARD_META_DATA_KEY =
4141
Metadata.Key.of("pd-forwarded-host", Metadata.ASCII_STRING_MARSHALLER);
4242

4343
static {

src/main/java/org/tikv/common/operation/KVErrorHandler.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,6 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) {
9494
Errorpb.Error error = regionHandler.getRegionError(resp);
9595
if (error != null) {
9696
return regionHandler.handleRegionError(backOffer, error);
97-
} else {
98-
regionHandler.tryUpdateRegionStore();
9997
}
10098

10199
// Key error handling logic

src/main/java/org/tikv/common/operation/RegionErrorHandler.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,10 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) {
4646
Errorpb.Error error = getRegionError(resp);
4747
if (error != null) {
4848
return handleRegionError(backOffer, error);
49-
} else {
50-
tryUpdateRegionStore();
5149
}
5250
return false;
5351
}
5452

55-
public void tryUpdateRegionStore() {
56-
recv.tryUpdateRegionStore();
57-
}
58-
5953
public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) {
6054
if (error.hasNotLeader()) {
6155
// this error is reported from raftstore:

0 commit comments

Comments
 (0)