Skip to content

Commit e02e61c

Browse files
ti-srebotsunxiaoguangbirdstorm
authored
Support select replica with rich meta data (#171) (#225)
* cherry pick #171 to release-3.1 Signed-off-by: ti-srebot <ti-srebot@pingcap.com> * fix test Signed-off-by: birdstorm <samuelwyf@hotmail.com> Co-authored-by: Xiaoguang Sun <sunxiaoguang@users.noreply.github.com> Co-authored-by: birdstorm <samuelwyf@hotmail.com>
1 parent 8210eaf commit e02e61c

21 files changed

+468
-230
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@
185185
<dependency>
186186
<groupId>org.apache.commons</groupId>
187187
<artifactId>commons-lang3</artifactId>
188-
<version>3.9</version>
188+
<version>3.10</version>
189189
<scope>test</scope>
190190
</dependency>
191191
<dependency>
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright 2021 PingCAP, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package org.tikv.common;
17+
18+
import static org.tikv.common.pd.PDUtils.addrToUri;
19+
20+
import com.google.common.annotations.Beta;
21+
import io.etcd.jetcd.ByteSequence;
22+
import io.etcd.jetcd.Client;
23+
import io.etcd.jetcd.KeyValue;
24+
import io.etcd.jetcd.kv.GetResponse;
25+
import java.net.URI;
26+
import java.nio.charset.StandardCharsets;
27+
import java.util.List;
28+
import java.util.concurrent.CompletableFuture;
29+
import java.util.concurrent.ConcurrentHashMap;
30+
import java.util.concurrent.ConcurrentMap;
31+
import java.util.concurrent.ExecutionException;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
35+
public class DefaultHostMapping implements HostMapping {
36+
private static final String NETWORK_MAPPING_PATH = "/client/url-mapping";
37+
private final Client etcdClient;
38+
private final String networkMappingName;
39+
private final ConcurrentMap<String, String> hostMapping;
40+
private final Logger logger = LoggerFactory.getLogger(DefaultHostMapping.class);
41+
42+
public DefaultHostMapping(Client etcdClient, String networkMappingName) {
43+
this.etcdClient = etcdClient;
44+
this.networkMappingName = networkMappingName;
45+
this.hostMapping = new ConcurrentHashMap<>();
46+
}
47+
48+
private ByteSequence hostToNetworkMappingKey(String host) {
49+
String path = NETWORK_MAPPING_PATH + "/" + networkMappingName + "/" + host;
50+
return ByteSequence.from(path, StandardCharsets.UTF_8);
51+
}
52+
53+
@Beta
54+
private String getMappedHostFromPD(String host) {
55+
ByteSequence hostKey = hostToNetworkMappingKey(host);
56+
for (int i = 0; i < 5; i++) {
57+
CompletableFuture<GetResponse> future = etcdClient.getKVClient().get(hostKey);
58+
try {
59+
GetResponse resp = future.get();
60+
List<KeyValue> kvs = resp.getKvs();
61+
if (kvs.size() != 1) {
62+
break;
63+
}
64+
return kvs.get(0).getValue().toString(StandardCharsets.UTF_8);
65+
} catch (InterruptedException e) {
66+
Thread.currentThread().interrupt();
67+
} catch (ExecutionException e) {
68+
logger.info("failed to get mapped Host from PD: " + host, e);
69+
break;
70+
} catch (Exception ignore) {
71+
// ignore
72+
break;
73+
}
74+
}
75+
return host;
76+
}
77+
78+
public URI getMappedURI(URI uri) {
79+
if (networkMappingName.isEmpty()) {
80+
return uri;
81+
}
82+
return addrToUri(
83+
hostMapping.computeIfAbsent(uri.getHost(), this::getMappedHostFromPD)
84+
+ ":"
85+
+ uri.getPort());
86+
}
87+
}

src/main/java/org/tikv/common/HostMapping.java

Lines changed: 3 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -15,73 +15,9 @@
1515

1616
package org.tikv.common;
1717

18-
import static org.tikv.common.pd.PDUtils.addrToUri;
19-
20-
import com.google.common.annotations.Beta;
21-
import io.etcd.jetcd.ByteSequence;
22-
import io.etcd.jetcd.Client;
23-
import io.etcd.jetcd.KeyValue;
24-
import io.etcd.jetcd.kv.GetResponse;
18+
import java.io.Serializable;
2519
import java.net.URI;
26-
import java.nio.charset.StandardCharsets;
27-
import java.util.List;
28-
import java.util.concurrent.CompletableFuture;
29-
import java.util.concurrent.ConcurrentHashMap;
30-
import java.util.concurrent.ConcurrentMap;
31-
import java.util.concurrent.ExecutionException;
32-
import org.slf4j.Logger;
33-
import org.slf4j.LoggerFactory;
34-
35-
public class HostMapping {
36-
private static final String NETWORK_MAPPING_PATH = "/client/url-mapping";
37-
private final Client etcdClient;
38-
private final String networkMappingName;
39-
private final ConcurrentMap<String, String> hostMapping;
40-
private final Logger logger = LoggerFactory.getLogger(HostMapping.class);
41-
42-
public HostMapping(Client etcdClient, String networkMappingName) {
43-
this.etcdClient = etcdClient;
44-
this.networkMappingName = networkMappingName;
45-
this.hostMapping = new ConcurrentHashMap<>();
46-
}
47-
48-
private ByteSequence hostToNetworkMappingKey(String host) {
49-
String path = NETWORK_MAPPING_PATH + "/" + networkMappingName + "/" + host;
50-
return ByteSequence.from(path, StandardCharsets.UTF_8);
51-
}
52-
53-
@Beta
54-
private String getMappedHostFromPD(String host) {
55-
ByteSequence hostKey = hostToNetworkMappingKey(host);
56-
for (int i = 0; i < 5; i++) {
57-
CompletableFuture<GetResponse> future = etcdClient.getKVClient().get(hostKey);
58-
try {
59-
GetResponse resp = future.get();
60-
List<KeyValue> kvs = resp.getKvs();
61-
if (kvs.size() != 1) {
62-
break;
63-
}
64-
return kvs.get(0).getValue().toString(StandardCharsets.UTF_8);
65-
} catch (InterruptedException e) {
66-
Thread.currentThread().interrupt();
67-
} catch (ExecutionException e) {
68-
logger.info("failed to get mapped Host from PD: " + host, e);
69-
break;
70-
} catch (Exception ignore) {
71-
// ignore
72-
break;
73-
}
74-
}
75-
return host;
76-
}
7720

78-
public URI getMappedURI(URI uri) {
79-
if (networkMappingName.isEmpty()) {
80-
return uri;
81-
}
82-
return addrToUri(
83-
hostMapping.computeIfAbsent(uri.getHost(), this::getMappedHostFromPD)
84-
+ ":"
85-
+ uri.getPort());
86-
}
21+
public interface HostMapping extends Serializable {
22+
URI getMappedURI(URI uri);
8723
}

src/main/java/org/tikv/common/PDClient.java

Lines changed: 39 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.net.URI;
3737
import java.nio.charset.StandardCharsets;
3838
import java.util.List;
39+
import java.util.Optional;
3940
import java.util.concurrent.CompletableFuture;
4041
import java.util.concurrent.ConcurrentHashMap;
4142
import java.util.concurrent.ConcurrentMap;
@@ -49,18 +50,20 @@
4950
import org.slf4j.LoggerFactory;
5051
import org.tikv.common.TiConfiguration.KVMode;
5152
import org.tikv.common.codec.Codec.BytesCodec;
53+
import org.tikv.common.codec.CodecDataInput;
5254
import org.tikv.common.codec.CodecDataOutput;
5355
import org.tikv.common.codec.KeyUtils;
5456
import org.tikv.common.exception.GrpcException;
5557
import org.tikv.common.exception.TiClientInternalException;
5658
import org.tikv.common.meta.TiTimestamp;
5759
import org.tikv.common.operation.NoopHandler;
5860
import org.tikv.common.operation.PDErrorHandler;
59-
import org.tikv.common.region.TiRegion;
6061
import org.tikv.common.util.BackOffFunction.BackOffFuncType;
6162
import org.tikv.common.util.BackOffer;
6263
import org.tikv.common.util.ChannelFactory;
6364
import org.tikv.common.util.ConcreteBackOffer;
65+
import org.tikv.common.util.Pair;
66+
import org.tikv.kvproto.Metapb;
6467
import org.tikv.kvproto.Metapb.Store;
6568
import org.tikv.kvproto.PDGrpc;
6669
import org.tikv.kvproto.PDGrpc.PDBlockingStub;
@@ -145,7 +148,7 @@ public TiTimestamp getTimestamp(BackOffer backOffer) {
145148
*
146149
* @param region represents a region info
147150
*/
148-
void scatterRegion(TiRegion region, BackOffer backOffer) {
151+
void scatterRegion(Metapb.Region region, BackOffer backOffer) {
149152
Supplier<ScatterRegionRequest> request =
150153
() ->
151154
ScatterRegionRequest.newBuilder().setHeader(header).setRegionId(region.getId()).build();
@@ -169,7 +172,7 @@ void scatterRegion(TiRegion region, BackOffer backOffer) {
169172
*
170173
* @param region
171174
*/
172-
void waitScatterRegionFinish(TiRegion region, BackOffer backOffer) {
175+
void waitScatterRegionFinish(Metapb.Region region, BackOffer backOffer) {
173176
for (; ; ) {
174177
GetOperatorResponse resp = getOperator(region.getId());
175178
if (resp != null) {
@@ -222,7 +225,7 @@ private boolean isScatterRegionFinish(GetOperatorResponse resp) {
222225
}
223226

224227
@Override
225-
public TiRegion getRegionByKey(BackOffer backOffer, ByteString key) {
228+
public Pair<Metapb.Region, Metapb.Peer> getRegionByKey(BackOffer backOffer, ByteString key) {
226229
Histogram.Timer requestTimer = PD_GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer();
227230
try {
228231
if (conf.getKvMode() == KVMode.TXN) {
@@ -240,37 +243,22 @@ public TiRegion getRegionByKey(BackOffer backOffer, ByteString key) {
240243

241244
GetRegionResponse resp =
242245
callWithRetry(backOffer, PDGrpc.getGetRegionMethod(), request, handler);
243-
return new TiRegion(
244-
resp.getRegion(),
245-
resp.getLeader(),
246-
null,
247-
conf.getIsolationLevel(),
248-
conf.getCommandPriority(),
249-
conf.getKvMode(),
250-
conf.getReplicaSelector());
246+
return new Pair<Metapb.Region, Metapb.Peer>(decodeRegion(resp.getRegion()), resp.getLeader());
251247
} finally {
252248
requestTimer.observeDuration();
253249
}
254250
}
255251

256252
@Override
257-
public TiRegion getRegionByID(BackOffer backOffer, long id) {
253+
public Pair<Metapb.Region, Metapb.Peer> getRegionByID(BackOffer backOffer, long id) {
258254
Supplier<GetRegionByIDRequest> request =
259255
() -> GetRegionByIDRequest.newBuilder().setHeader(header).setRegionId(id).build();
260256
PDErrorHandler<GetRegionResponse> handler =
261257
new PDErrorHandler<>(getRegionResponseErrorExtractor, this);
262258

263259
GetRegionResponse resp =
264260
callWithRetry(backOffer, PDGrpc.getGetRegionByIDMethod(), request, handler);
265-
// Instead of using default leader instance, explicitly set no leader to null
266-
return new TiRegion(
267-
resp.getRegion(),
268-
resp.getLeader(),
269-
null,
270-
conf.getIsolationLevel(),
271-
conf.getCommandPriority(),
272-
conf.getKvMode(),
273-
conf.getReplicaSelector());
261+
return new Pair<Metapb.Region, Metapb.Peer>(decodeRegion(resp.getRegion()), resp.getLeader());
274262
}
275263

276264
private Supplier<GetStoreRequest> buildGetStoreReq(long storeId) {
@@ -569,12 +557,15 @@ private void initCluster() {
569557
.setDaemon(true)
570558
.build()))
571559
.build();
572-
this.hostMapping = new HostMapping(this.etcdClient, conf.getNetworkMappingName());
560+
this.hostMapping =
561+
Optional.ofNullable(getConf().getHostMapping())
562+
.orElseGet(() -> new DefaultHostMapping(this.etcdClient, conf.getNetworkMappingName()));
573563
for (URI u : pdAddrs) {
574564
resp = getMembers(u);
575565
if (resp != null) {
576566
break;
577567
}
568+
logger.info("Could not get leader member with pd: " + u);
578569
}
579570
checkNotNull(resp, "Failed to init client for PD cluster.");
580571
long clusterId = resp.getHeader().getClusterId();
@@ -668,4 +659,29 @@ public String toString() {
668659
return "[leaderInfo: " + leaderInfo + "]";
669660
}
670661
}
662+
663+
private Metapb.Region decodeRegion(Metapb.Region region) {
664+
final boolean isRawRegion = conf.getKvMode() == KVMode.RAW;
665+
Metapb.Region.Builder builder =
666+
Metapb.Region.newBuilder()
667+
.setId(region.getId())
668+
.setRegionEpoch(region.getRegionEpoch())
669+
.addAllPeers(region.getPeersList());
670+
671+
if (region.getStartKey().isEmpty() || isRawRegion) {
672+
builder.setStartKey(region.getStartKey());
673+
} else {
674+
byte[] decodedStartKey = BytesCodec.readBytes(new CodecDataInput(region.getStartKey()));
675+
builder.setStartKey(ByteString.copyFrom(decodedStartKey));
676+
}
677+
678+
if (region.getEndKey().isEmpty() || isRawRegion) {
679+
builder.setEndKey(region.getEndKey());
680+
} else {
681+
byte[] decodedEndKey = BytesCodec.readBytes(new CodecDataInput(region.getEndKey()));
682+
builder.setEndKey(ByteString.copyFrom(decodedEndKey));
683+
}
684+
685+
return builder.build();
686+
}
671687
}

src/main/java/org/tikv/common/ReadOnlyPDClient.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818
import com.google.protobuf.ByteString;
1919
import java.util.List;
2020
import org.tikv.common.meta.TiTimestamp;
21-
import org.tikv.common.region.TiRegion;
2221
import org.tikv.common.util.BackOffer;
22+
import org.tikv.common.util.Pair;
23+
import org.tikv.kvproto.Metapb;
2324
import org.tikv.kvproto.Metapb.Store;
2425

2526
/** Readonly PD client including only reading related interface Supposed for TiDB-like use cases */
@@ -37,15 +38,15 @@ public interface ReadOnlyPDClient {
3738
* @param key key in bytes for locating a region
3839
* @return the region whose startKey and endKey range covers the given key
3940
*/
40-
TiRegion getRegionByKey(BackOffer backOffer, ByteString key);
41+
Pair<Metapb.Region, Metapb.Peer> getRegionByKey(BackOffer backOffer, ByteString key);
4142

4243
/**
4344
* Get Region by Region Id
4445
*
4546
* @param id Region Id
4647
* @return the region corresponding to the given Id
4748
*/
48-
TiRegion getRegionByID(BackOffer backOffer, long id);
49+
Pair<Metapb.Region, Metapb.Peer> getRegionByID(BackOffer backOffer, long id);
4950

5051
HostMapping getHostMapping();
5152

src/main/java/org/tikv/common/TiConfiguration.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ private static ReplicaRead getReplicaRead(String key) {
264264
private int grpcHealthCheckTimeout = getInt(TIKV_GRPC_HEALTH_CHECK_TIMEOUT);
265265

266266
private final String networkMappingName = get(TIKV_NETWORK_MAPPING_NAME);
267+
private HostMapping hostMapping = null;
267268

268269
public enum KVMode {
269270
TXN,
@@ -542,6 +543,14 @@ public String getNetworkMappingName() {
542543
return this.networkMappingName;
543544
}
544545

546+
public HostMapping getHostMapping() {
547+
return hostMapping;
548+
}
549+
550+
public void setHostMapping(HostMapping mapping) {
551+
this.hostMapping = mapping;
552+
}
553+
545554
public boolean getEnableGrpcForward() {
546555
return this.enableGrpcForward;
547556
}

0 commit comments

Comments
 (0)