1717
1818package org .tikv .raw ;
1919
20- import static org .tikv .common .util .ClientUtils .*;
20+ import static org .tikv .common .util .ClientUtils .appendBatches ;
21+ import static org .tikv .common .util .ClientUtils .genUUID ;
22+ import static org .tikv .common .util .ClientUtils .getBatches ;
23+ import static org .tikv .common .util .ClientUtils .getTasks ;
24+ import static org .tikv .common .util .ClientUtils .getTasksWithOutput ;
25+ import static org .tikv .common .util .ClientUtils .groupKeysByRegion ;
2126
2227import com .google .protobuf .ByteString ;
2328import io .prometheus .client .Counter ;
2429import io .prometheus .client .Histogram ;
25- import java .util .*;
26- import java .util .concurrent .*;
30+ import java .net .URI ;
31+ import java .util .ArrayList ;
32+ import java .util .HashMap ;
33+ import java .util .Iterator ;
34+ import java .util .LinkedList ;
35+ import java .util .List ;
36+ import java .util .Map ;
37+ import java .util .Objects ;
38+ import java .util .Optional ;
39+ import java .util .Queue ;
40+ import java .util .concurrent .ExecutionException ;
41+ import java .util .concurrent .ExecutorCompletionService ;
42+ import java .util .concurrent .ExecutorService ;
43+ import java .util .concurrent .Future ;
44+ import java .util .concurrent .TimeUnit ;
2745import java .util .stream .Collectors ;
2846import org .slf4j .Logger ;
2947import org .slf4j .LoggerFactory ;
4462import org .tikv .common .region .RegionStoreClient ;
4563import org .tikv .common .region .RegionStoreClient .RegionStoreClientBuilder ;
4664import org .tikv .common .region .TiRegion ;
47- import org .tikv .common .util .*;
65+ import org .tikv .common .util .BackOffFunction ;
66+ import org .tikv .common .util .BackOffer ;
67+ import org .tikv .common .util .Batch ;
68+ import org .tikv .common .util .ConcreteBackOffer ;
69+ import org .tikv .common .util .DeleteRange ;
70+ import org .tikv .common .util .HistogramUtils ;
71+ import org .tikv .common .util .Pair ;
72+ import org .tikv .common .util .ScanOption ;
4873import org .tikv .kvproto .Kvrpcpb .KvPair ;
4974
5075public class RawKVClient implements RawKVClientBase {
76+ private final long clusterId ;
77+ private final List <URI > pdAddresses ;
5178 private final TiSession tiSession ;
5279 private final RegionStoreClientBuilder clientBuilder ;
5380 private final TiConfiguration conf ;
@@ -95,6 +122,12 @@ public RawKVClient(TiSession session, RegionStoreClientBuilder clientBuilder) {
95122 this .batchScanThreadPool = session .getThreadPoolForBatchScan ();
96123 this .deleteRangeThreadPool = session .getThreadPoolForDeleteRange ();
97124 this .atomicForCAS = conf .isEnableAtomicForCAS ();
125+ this .clusterId = session .getPDClient ().getClusterId ();
126+ this .pdAddresses = session .getPDClient ().getPdAddrs ();
127+ }
128+
129+ private SlowLog withClusterInfo (SlowLog logger ) {
130+ return logger .withField ("cluster_id" , clusterId ).withField ("pd_addresses" , pdAddresses );
98131 }
99132
100133 @ Override
@@ -110,7 +143,7 @@ public void put(ByteString key, ByteString value, long ttl) {
110143 String label = "client_raw_put" ;
111144 Histogram .Timer requestTimer = RAW_REQUEST_LATENCY .labels (label ).startTimer ();
112145
113- SlowLog slowLog = new SlowLogImpl (conf .getRawKVWriteSlowLogInMS ());
146+ SlowLog slowLog = withClusterInfo ( new SlowLogImpl (conf .getRawKVWriteSlowLogInMS () ));
114147 SlowLogSpan span = slowLog .start ("put" );
115148 span .addProperty ("key" , KeyUtils .formatBytesUTF8 (key ));
116149
@@ -172,7 +205,7 @@ public void compareAndSet(
172205 String label = "client_raw_compare_and_set" ;
173206 Histogram .Timer requestTimer = RAW_REQUEST_LATENCY .labels (label ).startTimer ();
174207
175- SlowLog slowLog = new SlowLogImpl (conf .getRawKVWriteSlowLogInMS ());
208+ SlowLog slowLog = withClusterInfo ( new SlowLogImpl (conf .getRawKVWriteSlowLogInMS () ));
176209 SlowLogSpan span = slowLog .start ("putIfAbsent" );
177210 span .addProperty ("key" , KeyUtils .formatBytesUTF8 (key ));
178211
@@ -211,7 +244,7 @@ public void batchPut(Map<ByteString, ByteString> kvPairs, long ttl) {
211244 String label = "client_raw_batch_put" ;
212245 Histogram .Timer requestTimer = RAW_REQUEST_LATENCY .labels (label ).startTimer ();
213246
214- SlowLog slowLog = new SlowLogImpl (conf .getRawKVBatchWriteSlowLogInMS ());
247+ SlowLog slowLog = withClusterInfo ( new SlowLogImpl (conf .getRawKVBatchWriteSlowLogInMS () ));
215248 SlowLogSpan span = slowLog .start ("batchPut" );
216249 span .addProperty ("keySize" , String .valueOf (kvPairs .size ()));
217250
@@ -237,7 +270,7 @@ public Optional<ByteString> get(ByteString key) {
237270 String label = "client_raw_get" ;
238271 Histogram .Timer requestTimer = RAW_REQUEST_LATENCY .labels (label ).startTimer ();
239272
240- SlowLog slowLog = new SlowLogImpl (conf .getRawKVReadSlowLogInMS ());
273+ SlowLog slowLog = withClusterInfo ( new SlowLogImpl (conf .getRawKVReadSlowLogInMS () ));
241274 SlowLogSpan span = slowLog .start ("get" );
242275 span .addProperty ("key" , KeyUtils .formatBytesUTF8 (key ));
243276
@@ -270,7 +303,7 @@ public Optional<ByteString> get(ByteString key) {
270303 public List <KvPair > batchGet (List <ByteString > keys ) {
271304 String label = "client_raw_batch_get" ;
272305 Histogram .Timer requestTimer = RAW_REQUEST_LATENCY .labels (label ).startTimer ();
273- SlowLog slowLog = new SlowLogImpl (conf .getRawKVBatchReadSlowLogInMS ());
306+ SlowLog slowLog = withClusterInfo ( new SlowLogImpl (conf .getRawKVBatchReadSlowLogInMS () ));
274307 SlowLogSpan span = slowLog .start ("batchGet" );
275308 span .addProperty ("keySize" , String .valueOf (keys .size ()));
276309 ConcreteBackOffer backOffer =
@@ -295,7 +328,7 @@ public List<KvPair> batchGet(List<ByteString> keys) {
295328 public void batchDelete (List <ByteString > keys ) {
296329 String label = "client_raw_batch_delete" ;
297330 Histogram .Timer requestTimer = RAW_REQUEST_LATENCY .labels (label ).startTimer ();
298- SlowLog slowLog = new SlowLogImpl (conf .getRawKVBatchWriteSlowLogInMS ());
331+ SlowLog slowLog = withClusterInfo ( new SlowLogImpl (conf .getRawKVBatchWriteSlowLogInMS () ));
299332 SlowLogSpan span = slowLog .start ("batchDelete" );
300333 span .addProperty ("keySize" , String .valueOf (keys .size ()));
301334 ConcreteBackOffer backOffer =
@@ -320,7 +353,7 @@ public void batchDelete(List<ByteString> keys) {
320353 public Optional <Long > getKeyTTL (ByteString key ) {
321354 String label = "client_raw_get_key_ttl" ;
322355 Histogram .Timer requestTimer = RAW_REQUEST_LATENCY .labels (label ).startTimer ();
323- SlowLog slowLog = new SlowLogImpl (conf .getRawKVReadSlowLogInMS ());
356+ SlowLog slowLog = withClusterInfo ( new SlowLogImpl (conf .getRawKVReadSlowLogInMS () ));
324357 SlowLogSpan span = slowLog .start ("getKeyTTL" );
325358 span .addProperty ("key" , KeyUtils .formatBytesUTF8 (key ));
326359 ConcreteBackOffer backOffer =
@@ -428,7 +461,7 @@ public List<KvPair> scan(ByteString startKey, ByteString endKey, int limit) {
428461 public List <KvPair > scan (ByteString startKey , ByteString endKey , int limit , boolean keyOnly ) {
429462 String label = "client_raw_scan" ;
430463 Histogram .Timer requestTimer = RAW_REQUEST_LATENCY .labels (label ).startTimer ();
431- SlowLog slowLog = new SlowLogImpl (conf .getRawKVScanSlowLogInMS ());
464+ SlowLog slowLog = withClusterInfo ( new SlowLogImpl (conf .getRawKVScanSlowLogInMS () ));
432465 SlowLogSpan span = slowLog .start ("scan" );
433466 span .addProperty ("startKey" , KeyUtils .formatBytesUTF8 (startKey ));
434467 span .addProperty ("endKey" , KeyUtils .formatBytesUTF8 (endKey ));
@@ -473,7 +506,7 @@ public List<KvPair> scan(ByteString startKey, ByteString endKey) {
473506 public List <KvPair > scan (ByteString startKey , ByteString endKey , boolean keyOnly ) {
474507 String label = "client_raw_scan_without_limit" ;
475508 Histogram .Timer requestTimer = RAW_REQUEST_LATENCY .labels (label ).startTimer ();
476- SlowLog slowLog = new SlowLogImpl (conf .getRawKVScanSlowLogInMS ());
509+ SlowLog slowLog = withClusterInfo ( new SlowLogImpl (conf .getRawKVScanSlowLogInMS () ));
477510 SlowLogSpan span = slowLog .start ("scan" );
478511 span .addProperty ("startKey" , KeyUtils .formatBytesUTF8 (startKey ));
479512 span .addProperty ("endKey" , KeyUtils .formatBytesUTF8 (endKey ));
@@ -539,7 +572,7 @@ public List<KvPair> scanPrefix(ByteString prefixKey, boolean keyOnly) {
539572 public void delete (ByteString key ) {
540573 String label = "client_raw_delete" ;
541574 Histogram .Timer requestTimer = RAW_REQUEST_LATENCY .labels (label ).startTimer ();
542- SlowLog slowLog = new SlowLogImpl (conf .getRawKVWriteSlowLogInMS ());
575+ SlowLog slowLog = withClusterInfo ( new SlowLogImpl (conf .getRawKVWriteSlowLogInMS () ));
543576 SlowLogSpan span = slowLog .start ("delete" );
544577 span .addProperty ("key" , KeyUtils .formatBytesUTF8 (key ));
545578 ConcreteBackOffer backOffer =
0 commit comments