Skip to content

Commit 86606df

Browse files
[close #433] fix calling getStoreById without backoffer (#434)
1 parent 468e999 commit 86606df

File tree

7 files changed

+23
-24
lines changed

7 files changed

+23
-24
lines changed

src/main/java/org/tikv/common/TiSession.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -161,34 +161,31 @@ private static VersionInfo getVersionInfo() {
161161
}
162162

163163
private synchronized void warmUp() {
164-
long warmUpStartTime = System.currentTimeMillis();
164+
long warmUpStartTime = System.nanoTime();
165+
BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff();
165166
try {
166167
this.client = getPDClient();
167168
this.regionManager = getRegionManager();
168-
List<Metapb.Store> stores = this.client.getAllStores(ConcreteBackOffer.newGetBackOff());
169+
List<Metapb.Store> stores = this.client.getAllStores(backOffer);
169170
// warm up store cache
170171
for (Metapb.Store store : stores) {
171172
this.regionManager.updateStore(
172-
null,
173-
new TiStore(this.client.getStore(ConcreteBackOffer.newGetBackOff(), store.getId())));
173+
null, new TiStore(this.client.getStore(backOffer, store.getId())));
174174
}
175175

176176
// use scan region to load region cache with limit
177177
ByteString startKey = ByteString.EMPTY;
178178
do {
179179
List<Pdpb.Region> regions =
180180
regionManager.scanRegions(
181-
ConcreteBackOffer.newGetBackOff(),
182-
startKey,
183-
ByteString.EMPTY,
184-
conf.getScanRegionsLimit());
181+
backOffer, startKey, ByteString.EMPTY, conf.getScanRegionsLimit());
185182
if (regions == null || regions.isEmpty()) {
186183
// something went wrong, but the warm-up process could continue
187184
break;
188185
}
189186
for (Pdpb.Region region : regions) {
190187
regionManager.insertRegionToCache(
191-
regionManager.createRegion(region.getRegion(), ConcreteBackOffer.newGetBackOff()));
188+
regionManager.createRegion(region.getRegion(), backOffer));
192189
}
193190
startKey = regions.get(regions.size() - 1).getRegion().getEndKey();
194191
} while (!startKey.isEmpty());
@@ -211,7 +208,8 @@ private synchronized void warmUp() {
211208
logger.info("warm up fails, ignored ", e);
212209
} finally {
213210
logger.info(
214-
String.format("warm up duration %d ms", System.currentTimeMillis() - warmUpStartTime));
211+
String.format(
212+
"warm up duration %d ms", (System.nanoTime() - warmUpStartTime) / 1_000_000));
215213
}
216214
}
217215

src/main/java/org/tikv/common/operation/RegionErrorHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) {
7676
// onNotLeader is only needed when updateLeader succeeds, thus switch
7777
// to a new store address.
7878
TiRegion newRegion = this.regionManager.updateLeader(recv.getRegion(), newStoreId);
79-
retry = newRegion != null && recv.onNotLeader(newRegion);
79+
retry = newRegion != null && recv.onNotLeader(newRegion, backOffer);
8080

8181
backOffFuncType = BackOffFunction.BackOffFuncType.BoUpdateLeader;
8282
} else {

src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public void close() throws GrpcException {}
108108
* @return false when re-split is needed.
109109
*/
110110
@Override
111-
public boolean onNotLeader(TiRegion newRegion) {
111+
public boolean onNotLeader(TiRegion newRegion, BackOffer backOffer) {
112112
if (logger.isDebugEnabled()) {
113113
logger.debug(region + ", new leader = " + newRegion.getLeader().getStoreId());
114114
}
@@ -123,7 +123,7 @@ public boolean onNotLeader(TiRegion newRegion) {
123123
store = null;
124124
}
125125
region = newRegion;
126-
store = regionManager.getStoreById(region.getLeader().getStoreId());
126+
store = regionManager.getStoreById(region.getLeader().getStoreId(), backOffer);
127127
updateClientStub();
128128
return true;
129129
}
@@ -193,10 +193,10 @@ private Boolean seekLeaderStore(BackOffer backOffer) {
193193

194194
logger.info(String.format("try switch leader: region[%d]", region.getId()));
195195

196-
Metapb.Peer peer = switchLeaderStore();
196+
Metapb.Peer peer = switchLeaderStore(backOffer);
197197
if (peer != null) {
198198
// we found a leader
199-
TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId());
199+
TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
200200
if (currentLeaderStore.isReachable()) {
201201
logger.info(
202202
String.format(
@@ -232,7 +232,7 @@ private boolean seekProxyStore(BackOffer backOffer) {
232232
try {
233233
logger.info(String.format("try grpc forward: region[%d]", region.getId()));
234234
// when current leader cannot be reached
235-
TiStore storeWithProxy = switchProxyStore();
235+
TiStore storeWithProxy = switchProxyStore(backOffer);
236236
if (storeWithProxy == null) {
237237
// no store available, retry
238238
logger.warn(String.format("No store available, retry: region[%d]", region.getId()));
@@ -250,11 +250,11 @@ private boolean seekProxyStore(BackOffer backOffer) {
250250
}
251251

252252
// first: leader peer, second: true if any responses returned with grpc error
253-
private Metapb.Peer switchLeaderStore() {
253+
private Metapb.Peer switchLeaderStore(BackOffer backOffer) {
254254
List<SwitchLeaderTask> responses = new LinkedList<>();
255255
for (Metapb.Peer peer : region.getFollowerList()) {
256256
ByteString key = region.getStartKey();
257-
TiStore peerStore = regionManager.getStoreById(peer.getStoreId());
257+
TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
258258
ManagedChannel channel =
259259
channelFactory.getChannel(
260260
peerStore.getAddress(), regionManager.getPDClient().getHostMapping());
@@ -300,12 +300,12 @@ private Metapb.Peer switchLeaderStore() {
300300
}
301301
}
302302

303-
private TiStore switchProxyStore() {
303+
private TiStore switchProxyStore(BackOffer backOffer) {
304304
long forwardTimeout = conf.getForwardTimeout();
305305
List<ForwardCheckTask> responses = new LinkedList<>();
306306
for (Metapb.Peer peer : region.getFollowerList()) {
307307
ByteString key = region.getStartKey();
308-
TiStore peerStore = regionManager.getStoreById(peer.getStoreId());
308+
TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
309309
ManagedChannel channel =
310310
channelFactory.getChannel(
311311
peerStore.getAddress(), regionManager.getPDClient().getHostMapping());

src/main/java/org/tikv/common/region/RegionErrorReceiver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.tikv.common.util.BackOffer;
2121

2222
public interface RegionErrorReceiver {
23-
boolean onNotLeader(TiRegion region);
23+
boolean onNotLeader(TiRegion region, BackOffer backOffer);
2424

2525
/// return whether we need to retry this request.
2626
boolean onStoreUnreachable(BackOffer backOffer);

src/main/java/org/tikv/common/region/RegionStoreClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ public List<KvPair> scan(
274274
boolean forWrite = false;
275275
while (true) {
276276
// we should refresh region
277-
region = regionManager.getRegionByKey(startKey);
277+
region = regionManager.getRegionByKey(startKey, backOffer);
278278

279279
Supplier<ScanRequest> request =
280280
() ->

src/main/java/org/tikv/common/util/ConcreteBackOffer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,6 @@ public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType) {
168168
}
169169

170170
public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType, long maxSleepMs) {
171-
SlowLogSpan slowLogSpan = getSlowLog().start("backoff " + funcType.name());
172-
Histogram.Timer backOffTimer = BACKOFF_DURATION.labels(funcType.name()).startTimer();
173171
BackOffFunction backOffFunction =
174172
backOffFunctionMap.computeIfAbsent(funcType, this::createBackOffFunc);
175173

@@ -185,6 +183,8 @@ public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType, long
185183
}
186184
}
187185

186+
Histogram.Timer backOffTimer = BACKOFF_DURATION.labels(funcType.name()).startTimer();
187+
SlowLogSpan slowLogSpan = getSlowLog().start("backoff " + funcType.name());
188188
try {
189189
Thread.sleep(sleep);
190190
} catch (InterruptedException e) {

src/test/java/org/tikv/BaseRawKVTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ protected TiConfiguration createTiConfiguration() {
1414
conf.setTest(true);
1515
conf.setEnableAtomicForCAS(true);
1616
conf.setEnableGrpcForward(false);
17+
conf.setEnableAtomicForCAS(true);
1718
return conf;
1819
}
1920
}

0 commit comments

Comments
 (0)