Skip to content

Commit a345184

Browse files
ti-srebotpingyu
andauthored
[to #656] Fix scan with lock (#670) #697
Signed-off-by: ti-srebot <ti-srebot@pingcap.com> Signed-off-by: ti-srebot <ti-srebot@pingcap.com> Co-authored-by: Ping Yu <yuping@pingcap.com>
1 parent 82c99e2 commit a345184

File tree

6 files changed

+332
-40
lines changed

6 files changed

+332
-40
lines changed

src/main/java/org/tikv/common/region/RegionStoreClient.java

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ public List<KvPair> scan(
357357
this,
358358
lockResolverClient,
359359
resp -> resp.hasRegionError() ? resp.getRegionError() : null,
360-
resp -> null,
360+
resp -> resp.hasError() ? resp.getError() : null,
361361
resolveLockResult -> addResolvedLocks(version, resolveLockResult.getResolvedLocks()),
362362
version,
363363
forWrite);
@@ -366,13 +366,14 @@ public List<KvPair> scan(
366366
// we need to update region after retry
367367
region = regionManager.getRegionByKey(startKey, backOffer);
368368

369-
if (isScanSuccess(backOffer, resp)) {
370-
return doScan(resp);
369+
if (handleScanResponse(backOffer, resp, version, forWrite)) {
370+
return resp.getPairsList();
371371
}
372372
}
373373
}
374374

375-
private boolean isScanSuccess(BackOffer backOffer, ScanResponse resp) {
375+
private boolean handleScanResponse(
376+
BackOffer backOffer, ScanResponse resp, long version, boolean forWrite) {
376377
if (resp == null) {
377378
this.regionManager.onRequestFail(region);
378379
throw new TiClientInternalException("ScanResponse failed without a cause");
@@ -381,28 +382,35 @@ private boolean isScanSuccess(BackOffer backOffer, ScanResponse resp) {
381382
backOffer.doBackOff(BoRegionMiss, new RegionException(resp.getRegionError()));
382383
return false;
383384
}
384-
return true;
385-
}
386385

387-
// TODO: resolve locks after scan
388-
private List<KvPair> doScan(ScanResponse resp) {
389-
// Check if kvPair contains error, it should be a Lock if hasError is true.
390-
List<KvPair> kvPairs = resp.getPairsList();
391-
List<KvPair> newKvPairs = new ArrayList<>();
392-
for (KvPair kvPair : kvPairs) {
386+
// Resolve locks
387+
// Note: Memory lock conflict is returned by both `ScanResponse.error` &
388+
// `ScanResponse.pairs[0].error`, while other key errors are returned by
389+
// `ScanResponse.pairs.error`
390+
// See https://github.com/pingcap/kvproto/pull/697
391+
List<Lock> locks = new ArrayList<>();
392+
for (KvPair kvPair : resp.getPairsList()) {
393393
if (kvPair.hasError()) {
394394
Lock lock = AbstractLockResolverClient.extractLockFromKeyErr(kvPair.getError(), codec);
395-
newKvPairs.add(
396-
KvPair.newBuilder()
397-
.setError(kvPair.getError())
398-
.setValue(kvPair.getValue())
399-
.setKey(lock.getKey())
400-
.build());
401-
} else {
402-
newKvPairs.add(codec.decodeKvPair(kvPair));
395+
locks.add(lock);
396+
}
397+
}
398+
if (!locks.isEmpty()) {
399+
ResolveLockResult resolveLockResult =
400+
lockResolverClient.resolveLocks(backOffer, version, locks, forWrite);
401+
addResolvedLocks(version, resolveLockResult.getResolvedLocks());
402+
403+
long msBeforeExpired = resolveLockResult.getMsBeforeTxnExpired();
404+
if (msBeforeExpired > 0) {
405+
// if not resolve all locks, we wait and retry
406+
backOffer.doBackOffWithMaxSleep(
407+
BoTxnLockFast, msBeforeExpired, new KeyException(locks.toString()));
403408
}
409+
410+
return false;
404411
}
405-
return Collections.unmodifiableList(newKvPairs);
412+
413+
return true;
406414
}
407415

408416
public List<KvPair> scan(BackOffer backOffer, ByteString startKey, long version) {

src/test/java/org/tikv/common/KVMockServer.java

Lines changed: 168 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.slf4j.Logger;
4747
import org.slf4j.LoggerFactory;
4848
import org.tikv.common.key.Key;
49+
import org.tikv.common.meta.TiTimestamp;
4950
import org.tikv.common.region.TiRegion;
5051
import org.tikv.kvproto.Coprocessor;
5152
import org.tikv.kvproto.Errorpb;
@@ -67,6 +68,10 @@ public class KVMockServer extends TikvGrpc.TikvImplBase {
6768

6869
private final Map<Key, Supplier<Kvrpcpb.KeyError.Builder>> keyErrMap = new HashMap<>();
6970

71+
private final Map<Key, Supplier<Kvrpcpb.LockInfo.Builder>> lockMap = new HashMap<>();
72+
private final Map<Long, Supplier<Kvrpcpb.CheckTxnStatusResponse.Builder>> txnStatusMap =
73+
new HashMap<>();
74+
7075
// for KV error
7176
public static final int ABORT = 1;
7277
public static final int RETRY = 2;
@@ -117,9 +122,68 @@ public void putError(String key, Supplier<Errorpb.Error.Builder> builder) {
117122
regionErrMap.put(toRawKey(key.getBytes(StandardCharsets.UTF_8)), builder);
118123
}
119124

125+
public void removeError(String key) {
126+
regionErrMap.remove(toRawKey(key.getBytes(StandardCharsets.UTF_8)));
127+
}
128+
129+
// putWithLock is used to "prewrite" key-value without "commit"
130+
public void putWithLock(
131+
ByteString key, ByteString value, ByteString primaryKey, Long startTs, Long ttl) {
132+
put(key, value);
133+
134+
Kvrpcpb.LockInfo.Builder lock =
135+
Kvrpcpb.LockInfo.newBuilder()
136+
.setPrimaryLock(primaryKey)
137+
.setLockVersion(startTs)
138+
.setKey(key)
139+
.setLockTtl(ttl);
140+
lockMap.put(toRawKey(key), () -> lock);
141+
}
142+
143+
public void removeLock(ByteString key) {
144+
lockMap.remove(toRawKey(key));
145+
}
146+
147+
public boolean hasLock(ByteString key) {
148+
return lockMap.containsKey(toRawKey(key));
149+
}
150+
151+
// putTxnStatus is used to save transaction status
152+
// commitTs > 0: committed
153+
// commitTs == 0 && key is empty: rollback
154+
// commitTs == 0 && key not empty: locked by key
155+
public void putTxnStatus(Long startTs, Long commitTs, ByteString key) {
156+
if (commitTs > 0 || (commitTs == 0 && key.isEmpty())) { // committed || rollback
157+
Kvrpcpb.CheckTxnStatusResponse.Builder txnStatus =
158+
Kvrpcpb.CheckTxnStatusResponse.newBuilder()
159+
.setCommitVersion(commitTs)
160+
.setLockTtl(0)
161+
.setAction(Kvrpcpb.Action.NoAction);
162+
txnStatusMap.put(startTs, () -> txnStatus);
163+
} else { // locked
164+
Kvrpcpb.LockInfo.Builder lock = lockMap.get(toRawKey(key)).get();
165+
Kvrpcpb.CheckTxnStatusResponse.Builder txnStatus =
166+
Kvrpcpb.CheckTxnStatusResponse.newBuilder()
167+
.setCommitVersion(commitTs)
168+
.setLockTtl(lock.getLockTtl())
169+
.setAction(Kvrpcpb.Action.NoAction)
170+
.setLockInfo(lock);
171+
txnStatusMap.put(startTs, () -> txnStatus);
172+
}
173+
}
174+
175+
// putTxnStatus is used to save transaction status
176+
// commitTs > 0: committed
177+
// commitTs == 0: rollback
178+
public void putTxnStatus(Long startTs, Long commitTs) {
179+
putTxnStatus(startTs, commitTs, ByteString.EMPTY);
180+
}
181+
120182
public void clearAllMap() {
121183
dataMap.clear();
122184
regionErrMap.clear();
185+
lockMap.clear();
186+
txnStatusMap.clear();
123187
}
124188

125189
private Errorpb.Error verifyContext(Context context) throws Exception {
@@ -255,9 +319,12 @@ public void kvGet(
255319
return;
256320
}
257321

322+
Supplier<Kvrpcpb.LockInfo.Builder> lock = lockMap.get(key);
258323
Supplier<Kvrpcpb.KeyError.Builder> errProvider = keyErrMap.remove(key);
259324
if (errProvider != null) {
260325
builder.setError(errProvider.get().build());
326+
} else if (lock != null) {
327+
builder.setError(Kvrpcpb.KeyError.newBuilder().setLocked(lock.get()));
261328
} else {
262329
ByteString value = dataMap.get(key);
263330
builder.setValue(value);
@@ -299,11 +366,17 @@ public void kvScan(
299366
kvs.entrySet()
300367
.stream()
301368
.map(
302-
kv ->
303-
Kvrpcpb.KvPair.newBuilder()
304-
.setKey(kv.getKey().toByteString())
305-
.setValue(kv.getValue())
306-
.build())
369+
kv -> {
370+
Kvrpcpb.KvPair.Builder kvBuilder =
371+
Kvrpcpb.KvPair.newBuilder()
372+
.setKey(kv.getKey().toByteString())
373+
.setValue(kv.getValue());
374+
Supplier<Kvrpcpb.LockInfo.Builder> lock = lockMap.get(kv.getKey());
375+
if (lock != null) {
376+
kvBuilder.setError(Kvrpcpb.KeyError.newBuilder().setLocked(lock.get()));
377+
}
378+
return kvBuilder.build();
379+
})
307380
.collect(Collectors.toList()));
308381
}
309382
responseObserver.onNext(builder.build());
@@ -354,6 +427,96 @@ public void kvBatchGet(
354427
}
355428
}
356429

430+
@Override
431+
public void kvCheckTxnStatus(
432+
org.tikv.kvproto.Kvrpcpb.CheckTxnStatusRequest request,
433+
io.grpc.stub.StreamObserver<org.tikv.kvproto.Kvrpcpb.CheckTxnStatusResponse>
434+
responseObserver) {
435+
logger.info("KVMockServer.kvCheckTxnStatus");
436+
try {
437+
Long startTs = request.getLockTs();
438+
Long currentTs = request.getCurrentTs();
439+
logger.info("kvCheckTxnStatus for txn: " + startTs);
440+
Kvrpcpb.CheckTxnStatusResponse.Builder builder = Kvrpcpb.CheckTxnStatusResponse.newBuilder();
441+
442+
Error e = verifyContext(request.getContext());
443+
if (e != null) {
444+
responseObserver.onNext(builder.setRegionError(e).build());
445+
responseObserver.onCompleted();
446+
return;
447+
}
448+
449+
Supplier<Kvrpcpb.CheckTxnStatusResponse.Builder> txnStatus = txnStatusMap.get(startTs);
450+
if (txnStatus != null) {
451+
Kvrpcpb.CheckTxnStatusResponse resp = txnStatus.get().build();
452+
if (resp.getCommitVersion() == 0
453+
&& resp.getLockTtl() > 0
454+
&& TiTimestamp.extractPhysical(startTs) + resp.getLockInfo().getLockTtl()
455+
< TiTimestamp.extractPhysical(currentTs)) {
456+
ByteString key = resp.getLockInfo().getKey();
457+
logger.info(
458+
String.format(
459+
"kvCheckTxnStatus rollback expired txn: %d, remove lock: %s",
460+
startTs, key.toStringUtf8()));
461+
removeLock(key);
462+
putTxnStatus(startTs, 0L, ByteString.EMPTY);
463+
resp = txnStatusMap.get(startTs).get().build();
464+
}
465+
logger.info("kvCheckTxnStatus resp: " + resp);
466+
responseObserver.onNext(resp);
467+
} else {
468+
builder.setError(
469+
Kvrpcpb.KeyError.newBuilder()
470+
.setTxnNotFound(
471+
Kvrpcpb.TxnNotFound.newBuilder()
472+
.setPrimaryKey(request.getPrimaryKey())
473+
.setStartTs(startTs)));
474+
logger.info("kvCheckTxnStatus, TxnNotFound");
475+
responseObserver.onNext(builder.build());
476+
}
477+
responseObserver.onCompleted();
478+
} catch (Exception e) {
479+
logger.error("kvCheckTxnStatus error: " + e);
480+
responseObserver.onError(Status.INTERNAL.asRuntimeException());
481+
}
482+
}
483+
484+
@Override
485+
public void kvResolveLock(
486+
org.tikv.kvproto.Kvrpcpb.ResolveLockRequest request,
487+
io.grpc.stub.StreamObserver<org.tikv.kvproto.Kvrpcpb.ResolveLockResponse> responseObserver) {
488+
logger.info("KVMockServer.kvResolveLock");
489+
try {
490+
Long startTs = request.getStartVersion();
491+
Long commitTs = request.getCommitVersion();
492+
logger.info(
493+
String.format(
494+
"kvResolveLock for txn: %d, commitTs: %d, keys: %d",
495+
startTs, commitTs, request.getKeysCount()));
496+
Kvrpcpb.ResolveLockResponse.Builder builder = Kvrpcpb.ResolveLockResponse.newBuilder();
497+
498+
Error e = verifyContext(request.getContext());
499+
if (e != null) {
500+
responseObserver.onNext(builder.setRegionError(e).build());
501+
responseObserver.onCompleted();
502+
return;
503+
}
504+
505+
if (request.getKeysCount() == 0) {
506+
lockMap.entrySet().removeIf(entry -> entry.getValue().get().getLockVersion() == startTs);
507+
} else {
508+
for (int i = 0; i < request.getKeysCount(); i++) {
509+
removeLock(request.getKeys(i));
510+
}
511+
}
512+
513+
responseObserver.onNext(builder.build());
514+
responseObserver.onCompleted();
515+
} catch (Exception e) {
516+
responseObserver.onError(Status.INTERNAL.asRuntimeException());
517+
}
518+
}
519+
357520
@Override
358521
public void coprocessor(
359522
org.tikv.kvproto.Coprocessor.Request requestWrap,

src/test/java/org/tikv/common/MockServerTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public class MockServerTest extends PDMockServerTest {
3939
public void setup() throws IOException {
4040
super.setup();
4141

42+
port = GrpcUtils.getFreePort();
43+
4244
Metapb.Region r =
4345
Metapb.Region.newBuilder()
4446
.setRegionEpoch(Metapb.RegionEpoch.newBuilder().setConfVer(1).setVersion(2))
@@ -51,7 +53,7 @@ public void setup() throws IOException {
5153
List<Metapb.Store> s =
5254
ImmutableList.of(
5355
Metapb.Store.newBuilder()
54-
.setAddress("localhost:1234")
56+
.setAddress(LOCAL_ADDR + ":" + port)
5557
.setVersion("5.0.0")
5658
.setId(13)
5759
.build());
@@ -70,6 +72,6 @@ public void setup() throws IOException {
7072
(request) -> Pdpb.GetStoreResponse.newBuilder().setStore(store).build());
7173
}
7274
server = new KVMockServer();
73-
port = server.start(region);
75+
server.start(region, port);
7476
}
7577
}

src/test/java/org/tikv/common/PDClientMockTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,12 @@ public void testSwitchLeader() throws Exception {
7474
@Test
7575
public void testTso() throws Exception {
7676
try (PDClient client = session.getPDClient()) {
77+
Long current = System.currentTimeMillis();
7778
TiTimestamp ts = client.getTimestamp(defaultBackOff());
78-
// Test pdServer is set to generate physical == logical + 1
79-
assertEquals(ts.getPhysical(), ts.getLogical() + 1);
79+
// Test pdServer is set to generate physical to current, logical to 1
80+
assertTrue(ts.getPhysical() >= current);
81+
assertTrue(ts.getPhysical() < current + 100);
82+
assertEquals(ts.getLogical(), 1);
8083
}
8184
}
8285

src/test/java/org/tikv/common/PDMockServer.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,17 @@ public void getMembers(GetMembersRequest request, StreamObserver<GetMembersRespo
7575
@Override
7676
public StreamObserver<TsoRequest> tso(StreamObserver<TsoResponse> resp) {
7777
return new StreamObserver<TsoRequest>() {
78-
private int physical = 1;
79-
private int logical = 0;
78+
private long physical = System.currentTimeMillis();
79+
private long logical = 0;
80+
81+
private void updateTso() {
82+
logical++;
83+
if (logical >= (1 << 18)) {
84+
logical = 0;
85+
physical++;
86+
}
87+
physical = Math.max(physical, System.currentTimeMillis());
88+
}
8089

8190
@Override
8291
public void onNext(TsoRequest value) {}
@@ -86,7 +95,8 @@ public void onError(Throwable t) {}
8695

8796
@Override
8897
public void onCompleted() {
89-
resp.onNext(GrpcUtils.makeTsoResponse(clusterId, physical++, logical++));
98+
updateTso();
99+
resp.onNext(GrpcUtils.makeTsoResponse(clusterId, physical, logical));
90100
resp.onCompleted();
91101
}
92102
};

0 commit comments

Comments
 (0)