|
3 | 3 | import com.google.protobuf.ByteString; |
4 | 4 | import io.grpc.Status; |
5 | 5 | import io.grpc.StatusRuntimeException; |
| 6 | +import java.util.ArrayList; |
| 7 | +import java.util.List; |
6 | 8 | import java.util.function.Function; |
7 | 9 | import org.slf4j.Logger; |
8 | 10 | import org.slf4j.LoggerFactory; |
9 | 11 | import org.tikv.common.codec.KeyUtils; |
10 | 12 | import org.tikv.common.exception.GrpcException; |
| 13 | +import org.tikv.common.exception.TiKVException; |
11 | 14 | import org.tikv.common.region.RegionErrorReceiver; |
12 | 15 | import org.tikv.common.region.RegionManager; |
13 | 16 | import org.tikv.common.region.TiRegion; |
14 | 17 | import org.tikv.common.util.BackOffFunction; |
15 | 18 | import org.tikv.common.util.BackOffer; |
16 | 19 | import org.tikv.kvproto.Errorpb; |
| 20 | +import org.tikv.kvproto.Metapb; |
17 | 21 |
|
18 | 22 | public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> { |
19 | 23 | private static final Logger logger = LoggerFactory.getLogger(RegionErrorHandler.class); |
@@ -115,11 +119,11 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { |
115 | 119 | // throwing it out. |
116 | 120 | return false; |
117 | 121 | } else if (error.hasEpochNotMatch()) { |
118 | | - // this error is reported from raftstore: |
119 | | - // region has outdated version,please try later. |
120 | | - logger.warn(String.format("Stale Epoch encountered for region [%s]", recv.getRegion())); |
121 | | - this.regionManager.onRegionStale(recv.getRegion()); |
122 | | - return false; |
| 122 | + logger.warn( |
| 123 | + String.format( |
| 124 | + "tikv reports `EpochNotMatch` retry later, region: %s, EpochNotMatch: %s", |
| 125 | + recv.getRegion(), error.getEpochNotMatch())); |
| 126 | + return onRegionEpochNotMatch(backOffer, error.getEpochNotMatch().getCurrentRegionsList()); |
123 | 127 | } else if (error.hasServerIsBusy()) { |
124 | 128 | // this error is reported from kv: |
125 | 129 | // will occur when write pressure is high. Please try later. |
@@ -171,6 +175,54 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { |
171 | 175 | return false; |
172 | 176 | } |
173 | 177 |
|
| 178 | + // ref: https://github.com/tikv/client-go/blob/tidb-5.2/internal/locate/region_request.go#L985 |
| 179 | + // OnRegionEpochNotMatch removes the old region and inserts new regions into the cache. |
| 180 | + // It returns whether retries the request because it's possible the region epoch is ahead of |
| 181 | + // TiKV's due to slow appling. |
| 182 | + private boolean onRegionEpochNotMatch(BackOffer backOffer, List<Metapb.Region> currentRegions) { |
| 183 | + if (currentRegions.size() == 0) { |
| 184 | + this.regionManager.onRegionStale(recv.getRegion()); |
| 185 | + return false; |
| 186 | + } |
| 187 | + |
| 188 | + // Find whether the region epoch in `ctx` is ahead of TiKV's. If so, backoff. |
| 189 | + for (Metapb.Region meta : currentRegions) { |
| 190 | + if (meta.getId() == recv.getRegion().getId() |
| 191 | + && (meta.getRegionEpoch().getConfVer() < recv.getRegion().getVerID().getConfVer() |
| 192 | + || meta.getRegionEpoch().getVersion() < recv.getRegion().getVerID().getVer())) { |
| 193 | + String errorMsg = |
| 194 | + String.format( |
| 195 | + "region epoch is ahead of tikv, region: %s, currentRegions: %s", |
| 196 | + recv.getRegion(), currentRegions); |
| 197 | + logger.info(errorMsg); |
| 198 | + backOffer.doBackOff( |
| 199 | + BackOffFunction.BackOffFuncType.BoRegionMiss, new TiKVException(errorMsg)); |
| 200 | + return true; |
| 201 | + } |
| 202 | + } |
| 203 | + |
| 204 | + boolean needInvalidateOld = true; |
| 205 | + List<TiRegion> newRegions = new ArrayList<>(currentRegions.size()); |
| 206 | + // If the region epoch is not ahead of TiKV's, replace region meta in region cache. |
| 207 | + for (Metapb.Region meta : currentRegions) { |
| 208 | + TiRegion region = regionManager.createRegion(meta, backOffer); |
| 209 | + newRegions.add(region); |
| 210 | + if (recv.getRegion().getVerID() == region.getVerID()) { |
| 211 | + needInvalidateOld = false; |
| 212 | + } |
| 213 | + } |
| 214 | + |
| 215 | + if (needInvalidateOld) { |
| 216 | + this.regionManager.onRegionStale(recv.getRegion()); |
| 217 | + } |
| 218 | + |
| 219 | + for (TiRegion region : newRegions) { |
| 220 | + regionManager.insertRegionToCache(region); |
| 221 | + } |
| 222 | + |
| 223 | + return false; |
| 224 | + } |
| 225 | + |
174 | 226 | @Override |
175 | 227 | public boolean handleRequestError(BackOffer backOffer, Exception e) { |
176 | 228 | if (recv.onStoreUnreachable()) { |
|
0 commit comments