|
104 | 104 | import org.tikv.kvproto.Pdpb.Timestamp; |
105 | 105 | import org.tikv.kvproto.Pdpb.TsoRequest; |
106 | 106 | import org.tikv.kvproto.Pdpb.TsoResponse; |
| 107 | +import org.tikv.kvproto.Pdpb.UpdateServiceGCSafePointRequest; |
107 | 108 |
|
108 | 109 | public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub> |
109 | 110 | implements ReadOnlyPDClient { |
@@ -383,6 +384,17 @@ private Supplier<GetAllStoresRequest> buildGetAllStoresReq() { |
383 | 384 | return () -> GetAllStoresRequest.newBuilder().setHeader(header).build(); |
384 | 385 | } |
385 | 386 |
|
| 387 | + private Supplier<UpdateServiceGCSafePointRequest> buildUpdateServiceGCSafePointRequest( |
| 388 | + ByteString serviceId, long ttl, long safePoint) { |
| 389 | + return () -> |
| 390 | + UpdateServiceGCSafePointRequest.newBuilder() |
| 391 | + .setHeader(header) |
| 392 | + .setSafePoint(safePoint) |
| 393 | + .setServiceId(serviceId) |
| 394 | + .setTTL(ttl) |
| 395 | + .build(); |
| 396 | + } |
| 397 | + |
386 | 398 | private <T> PDErrorHandler<GetStoreResponse> buildPDErrorHandler() { |
387 | 399 | return new PDErrorHandler<>( |
388 | 400 | r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null, this); |
@@ -419,6 +431,20 @@ public TiConfiguration.ReplicaRead getReplicaRead() { |
419 | 431 | return conf.getReplicaRead(); |
420 | 432 | } |
421 | 433 |
|
| 434 | + @Override |
| 435 | + public Long updateServiceGCSafePoint( |
| 436 | + String serviceId, long ttl, long safePoint, BackOffer backOffer) { |
| 437 | + return callWithRetry( |
| 438 | + backOffer, |
| 439 | + PDGrpc.getUpdateServiceGCSafePointMethod(), |
| 440 | + buildUpdateServiceGCSafePointRequest( |
| 441 | + ByteString.copyFromUtf8(serviceId), ttl, safePoint), |
| 442 | + new PDErrorHandler<>( |
| 443 | + r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null, |
| 444 | + this)) |
| 445 | + .getMinSafePoint(); |
| 446 | + } |
| 447 | + |
422 | 448 | @Override |
423 | 449 | public void close() throws InterruptedException { |
424 | 450 | etcdClient.close(); |
|
0 commit comments