Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/main/java/org/tikv/common/PDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -840,4 +840,12 @@ private Metapb.Region decodeRegion(Metapb.Region region) {

return builder.build();
}

public long getClusterId() {
return header.getClusterId();
}

public List<URI> getPdAddrs() {
return pdAddrs;
}
}
10 changes: 10 additions & 0 deletions src/main/java/org/tikv/common/log/SlowLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

package org.tikv.common.log;

import com.google.common.collect.ImmutableMap;
import java.util.Map;

public interface SlowLog {

SlowLogSpan start(String name);

long getTraceId();
Expand All @@ -26,5 +30,11 @@ public interface SlowLog {

void setError(Throwable err);

SlowLog withFields(Map<String, Object> fields);

default SlowLog withField(String key, Object value) {
return withFields(ImmutableMap.of(key, value));
}

void log();
}
7 changes: 7 additions & 0 deletions src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.tikv.common.log;

import java.util.Map;

public class SlowLogEmptyImpl implements SlowLog {
public static final SlowLogEmptyImpl INSTANCE = new SlowLogEmptyImpl();

Expand All @@ -40,6 +42,11 @@ public long getThresholdMS() {
@Override
public void setError(Throwable err) {}

@Override
public SlowLog withFields(Map<String, Object> fields) {
return this;
}

@Override
public void log() {}
}
30 changes: 30 additions & 0 deletions src/main/java/org/tikv/common/log/SlowLogImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,24 @@
import java.math.BigInteger;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SlowLogImpl implements SlowLog {

private static final Logger logger = LoggerFactory.getLogger(SlowLogImpl.class);

private static final int MAX_SPAN_SIZE = 1024;

private static final Random random = new Random();

private final List<SlowLogSpan> slowLogSpans = new ArrayList<>();
private final HashMap<String, Object> fields = new HashMap<>();
private Throwable error = null;

private final long startMS;
Expand Down Expand Up @@ -81,6 +86,12 @@ public void setError(Throwable err) {
this.error = err;
}

@Override
public SlowLog withFields(Map<String, Object> fields) {
this.fields.putAll(fields);
return this;
}

@Override
public void log() {
recordTime();
Expand Down Expand Up @@ -120,6 +131,25 @@ JsonObject getSlowLogJson() {
}
jsonObject.add("spans", jsonArray);

for (Entry<String, Object> entry : fields.entrySet()) {
Object value = entry.getValue();
if (value instanceof List) {
JsonArray field = new JsonArray();
for (Object o : (List<?>) value) {
field.add(o.toString());
}
jsonObject.add(entry.getKey(), field);
} else if (value instanceof Map) {
JsonObject field = new JsonObject();
for (Entry<?, ?> e : ((Map<?, ?>) value).entrySet()) {
field.addProperty(e.getKey().toString(), e.getValue().toString());
}
jsonObject.add(entry.getKey(), field);
} else {
jsonObject.addProperty(entry.getKey(), value.toString());
}
}

return jsonObject;
}

Expand Down
61 changes: 47 additions & 14 deletions src/main/java/org/tikv/raw/RawKVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,31 @@

package org.tikv.raw;

import static org.tikv.common.util.ClientUtils.*;
import static org.tikv.common.util.ClientUtils.appendBatches;
import static org.tikv.common.util.ClientUtils.genUUID;
import static org.tikv.common.util.ClientUtils.getBatches;
import static org.tikv.common.util.ClientUtils.getTasks;
import static org.tikv.common.util.ClientUtils.getTasksWithOutput;
import static org.tikv.common.util.ClientUtils.groupKeysByRegion;

import com.google.protobuf.ByteString;
import io.prometheus.client.Counter;
import io.prometheus.client.Histogram;
import java.util.*;
import java.util.concurrent.*;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -44,10 +62,19 @@
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.*;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.Batch;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.DeleteRange;
import org.tikv.common.util.HistogramUtils;
import org.tikv.common.util.Pair;
import org.tikv.common.util.ScanOption;
import org.tikv.kvproto.Kvrpcpb.KvPair;

public class RawKVClient implements RawKVClientBase {
private final long clusterId;
private final List<URI> pdAddresses;
private final TiSession tiSession;
private final RegionStoreClientBuilder clientBuilder;
private final TiConfiguration conf;
Expand Down Expand Up @@ -95,6 +122,12 @@ public RawKVClient(TiSession session, RegionStoreClientBuilder clientBuilder) {
this.batchScanThreadPool = session.getThreadPoolForBatchScan();
this.deleteRangeThreadPool = session.getThreadPoolForDeleteRange();
this.atomicForCAS = conf.isEnableAtomicForCAS();
this.clusterId = session.getPDClient().getClusterId();
this.pdAddresses = session.getPDClient().getPdAddrs();
}

private SlowLog withClusterInfo(SlowLog logger) {
return logger.withField("cluster_id", clusterId).withField("pd_addresses", pdAddresses);
}

@Override
Expand All @@ -110,7 +143,7 @@ public void put(ByteString key, ByteString value, long ttl) {
String label = "client_raw_put";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();

SlowLog slowLog = new SlowLogImpl(conf.getRawKVWriteSlowLogInMS());
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS()));
SlowLogSpan span = slowLog.start("put");
span.addProperty("key", KeyUtils.formatBytesUTF8(key));

Expand Down Expand Up @@ -172,7 +205,7 @@ public void compareAndSet(
String label = "client_raw_compare_and_set";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();

SlowLog slowLog = new SlowLogImpl(conf.getRawKVWriteSlowLogInMS());
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS()));
SlowLogSpan span = slowLog.start("putIfAbsent");
span.addProperty("key", KeyUtils.formatBytesUTF8(key));

Expand Down Expand Up @@ -211,7 +244,7 @@ public void batchPut(Map<ByteString, ByteString> kvPairs, long ttl) {
String label = "client_raw_batch_put";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();

SlowLog slowLog = new SlowLogImpl(conf.getRawKVBatchWriteSlowLogInMS());
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchWriteSlowLogInMS()));
SlowLogSpan span = slowLog.start("batchPut");
span.addProperty("keySize", String.valueOf(kvPairs.size()));

Expand All @@ -237,7 +270,7 @@ public Optional<ByteString> get(ByteString key) {
String label = "client_raw_get";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();

SlowLog slowLog = new SlowLogImpl(conf.getRawKVReadSlowLogInMS());
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVReadSlowLogInMS()));
SlowLogSpan span = slowLog.start("get");
span.addProperty("key", KeyUtils.formatBytesUTF8(key));

Expand Down Expand Up @@ -270,7 +303,7 @@ public Optional<ByteString> get(ByteString key) {
public List<KvPair> batchGet(List<ByteString> keys) {
String label = "client_raw_batch_get";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog = new SlowLogImpl(conf.getRawKVBatchReadSlowLogInMS());
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchReadSlowLogInMS()));
SlowLogSpan span = slowLog.start("batchGet");
span.addProperty("keySize", String.valueOf(keys.size()));
ConcreteBackOffer backOffer =
Expand All @@ -295,7 +328,7 @@ public List<KvPair> batchGet(List<ByteString> keys) {
public void batchDelete(List<ByteString> keys) {
String label = "client_raw_batch_delete";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog = new SlowLogImpl(conf.getRawKVBatchWriteSlowLogInMS());
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchWriteSlowLogInMS()));
SlowLogSpan span = slowLog.start("batchDelete");
span.addProperty("keySize", String.valueOf(keys.size()));
ConcreteBackOffer backOffer =
Expand All @@ -320,7 +353,7 @@ public void batchDelete(List<ByteString> keys) {
public Optional<Long> getKeyTTL(ByteString key) {
String label = "client_raw_get_key_ttl";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog = new SlowLogImpl(conf.getRawKVReadSlowLogInMS());
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVReadSlowLogInMS()));
SlowLogSpan span = slowLog.start("getKeyTTL");
span.addProperty("key", KeyUtils.formatBytesUTF8(key));
ConcreteBackOffer backOffer =
Expand Down Expand Up @@ -428,7 +461,7 @@ public List<KvPair> scan(ByteString startKey, ByteString endKey, int limit) {
public List<KvPair> scan(ByteString startKey, ByteString endKey, int limit, boolean keyOnly) {
String label = "client_raw_scan";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog = new SlowLogImpl(conf.getRawKVScanSlowLogInMS());
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVScanSlowLogInMS()));
SlowLogSpan span = slowLog.start("scan");
span.addProperty("startKey", KeyUtils.formatBytesUTF8(startKey));
span.addProperty("endKey", KeyUtils.formatBytesUTF8(endKey));
Expand Down Expand Up @@ -473,7 +506,7 @@ public List<KvPair> scan(ByteString startKey, ByteString endKey) {
public List<KvPair> scan(ByteString startKey, ByteString endKey, boolean keyOnly) {
String label = "client_raw_scan_without_limit";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog = new SlowLogImpl(conf.getRawKVScanSlowLogInMS());
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVScanSlowLogInMS()));
SlowLogSpan span = slowLog.start("scan");
span.addProperty("startKey", KeyUtils.formatBytesUTF8(startKey));
span.addProperty("endKey", KeyUtils.formatBytesUTF8(endKey));
Expand Down Expand Up @@ -539,7 +572,7 @@ public List<KvPair> scanPrefix(ByteString prefixKey, boolean keyOnly) {
public void delete(ByteString key) {
String label = "client_raw_delete";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog = new SlowLogImpl(conf.getRawKVWriteSlowLogInMS());
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS()));
SlowLogSpan span = slowLog.start("delete");
span.addProperty("key", KeyUtils.formatBytesUTF8(key));
ConcreteBackOffer backOffer =
Expand Down
23 changes: 23 additions & 0 deletions src/test/java/org/tikv/common/log/SlowLogImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.tikv.common.log;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -63,4 +66,24 @@ public void testUnsignedLong() {
Assert.assertEquals("18446744073709551615", SlowLogImpl.toUnsignedBigInteger(-1L).toString());
Assert.assertEquals("18446744073709551614", SlowLogImpl.toUnsignedBigInteger(-2L).toString());
}

@Test
public void testWithFields() throws InterruptedException {
SlowLogImpl slowLog = new SlowLogImpl(1);
slowLog
.withField("key0", "value0")
.withField("key1", ImmutableList.of("value0", "value1"))
.withField("key2", ImmutableMap.of("key3", "value3"));

JsonObject object = slowLog.getSlowLogJson();
Assert.assertEquals("value0", object.get("key0").getAsString());

AtomicInteger i = new AtomicInteger();
object
.get("key1")
.getAsJsonArray()
.forEach(e -> Assert.assertEquals("value" + (i.getAndIncrement()), e.getAsString()));

Assert.assertEquals("value3", object.get("key2").getAsJsonObject().get("key3").getAsString());
}
}