Skip to content

Commit 091fdca

Browse files
committed
Revert "reset code"
This reverts commit 9b46463
1 parent f7171d6 commit 091fdca

22 files changed

+847
-348
lines changed

build.gradle

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,6 @@ dependencies {
102102
compile group: 'joda-time', name: 'joda-time', version:'2.9.9'
103103
compile group: 'org.joda', name: 'joda-convert', version:'1.9.2'
104104
testCompile group: 'io.grpc', name: 'grpc-testing', version:'1.7.0'
105-
106-
//remove unused hadoop dependencies
107-
108105
/*compile group: 'org.apache.logging.log4j', name: 'log4j-api', version:'2.8.1'
109106
compile group: 'org.apache.logging.log4j', name: 'log4j-core', version:'2.8.1'
110107
compile group: 'org.apache.spark', name: 'spark-core_2.11', version:'2.3.2'

src/main/java/org/tikv/common/AbstractGRPCClient.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,21 +29,24 @@
2929
import org.tikv.common.policy.RetryPolicy;
3030
import org.tikv.common.streaming.StreamingResponse;
3131
import org.tikv.common.util.BackOffer;
32-
import org.tikv.common.util.ChannelFactory;
3332

3433
public abstract class AbstractGRPCClient<
3534
BlockingStubT extends AbstractStub<BlockingStubT>, StubT extends AbstractStub<StubT>>
3635
implements AutoCloseable {
3736
protected final Logger logger = Logger.getLogger(this.getClass());
38-
protected final TiConfiguration conf;
39-
protected final ChannelFactory channelFactory;
37+
protected TiSession session;
38+
protected TiConfiguration conf;
4039

41-
protected AbstractGRPCClient(TiConfiguration conf, ChannelFactory channelFactory) {
42-
this.conf = conf;
43-
this.channelFactory = channelFactory;
40+
protected AbstractGRPCClient(TiSession session) {
41+
this.session = session;
42+
this.conf = session.getConf();
4443
}
4544

46-
protected TiConfiguration getConf() {
45+
public TiSession getSession() {
46+
return session;
47+
}
48+
49+
public TiConfiguration getConf() {
4750
return conf;
4851
}
4952

src/main/java/org/tikv/common/PDClient.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import java.util.concurrent.ScheduledExecutorService;
3333
import java.util.concurrent.TimeUnit;
3434
import java.util.function.Supplier;
35-
import org.tikv.common.TiConfiguration.KVMode;
3635
import org.tikv.common.codec.Codec.BytesCodec;
3736
import org.tikv.common.codec.CodecDataOutput;
3837
import org.tikv.common.exception.GrpcException;
@@ -42,15 +41,13 @@
4241
import org.tikv.common.operation.PDErrorHandler;
4342
import org.tikv.common.region.TiRegion;
4443
import org.tikv.common.util.BackOffer;
45-
import org.tikv.common.util.ChannelFactory;
4644
import org.tikv.common.util.FutureObserver;
4745
import org.tikv.kvproto.Metapb.Store;
4846
import org.tikv.kvproto.PDGrpc;
4947
import org.tikv.kvproto.PDGrpc.PDBlockingStub;
5048
import org.tikv.kvproto.PDGrpc.PDStub;
5149
import org.tikv.kvproto.Pdpb.*;
5250

53-
/** PDClient is thread-safe and suggested to be shared threads */
5451
public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
5552
implements ReadOnlyPDClient {
5653
private RequestHeader header;
@@ -76,7 +73,7 @@ public TiTimestamp getTimestamp(BackOffer backOffer) {
7673
@Override
7774
public TiRegion getRegionByKey(BackOffer backOffer, ByteString key) {
7875
Supplier<GetRegionRequest> request;
79-
if (conf.getKvMode() == KVMode.RAW) {
76+
if (conf.getKvMode().equalsIgnoreCase("RAW")) {
8077
request = () -> GetRegionRequest.newBuilder().setHeader(header).setRegionKey(key).build();
8178
} else {
8279
CodecDataOutput cdo = new CodecDataOutput();
@@ -198,8 +195,8 @@ public void close() {
198195
}
199196
}
200197

201-
public static ReadOnlyPDClient create(TiConfiguration conf, ChannelFactory channelFactory) {
202-
return createRaw(conf, channelFactory);
198+
public static ReadOnlyPDClient create(TiSession session) {
199+
return createRaw(session);
203200
}
204201

205202
@VisibleForTesting
@@ -250,7 +247,7 @@ void close() {}
250247

251248
public GetMembersResponse getMembers(HostAndPort url) {
252249
try {
253-
ManagedChannel probChan = channelFactory.getChannel(url.getHostText() + ":" + url.getPort());
250+
ManagedChannel probChan = session.getChannel(url.getHostText() + ":" + url.getPort());
254251
PDGrpc.PDBlockingStub stub = PDGrpc.newBlockingStub(probChan);
255252
GetMembersRequest request =
256253
GetMembersRequest.newBuilder().setHeader(RequestHeader.getDefaultInstance()).build();
@@ -282,7 +279,7 @@ private boolean createLeaderWrapper(String leaderUrlStr) {
282279
}
283280

284281
// create new Leader
285-
ManagedChannel clientChannel = channelFactory.getChannel(leaderUrlStr);
282+
ManagedChannel clientChannel = session.getChannel(leaderUrlStr);
286283
leaderWrapper =
287284
new LeaderWrapper(
288285
leaderUrlStr,
@@ -333,13 +330,13 @@ protected PDStub getAsyncStub() {
333330
.withDeadlineAfter(getConf().getTimeout(), getConf().getTimeoutUnit());
334331
}
335332

336-
private PDClient(TiConfiguration conf, ChannelFactory channelFactory) {
337-
super(conf, channelFactory);
333+
private PDClient(TiSession session) {
334+
super(session);
338335
}
339336

340337
private void initCluster() {
341338
GetMembersResponse resp = null;
342-
List<HostAndPort> pdAddrs = getConf().getPdAddrs();
339+
List<HostAndPort> pdAddrs = getSession().getConf().getPdAddrs();
343340
for (HostAndPort u : pdAddrs) {
344341
resp = getMembers(u);
345342
if (resp != null) {
@@ -369,10 +366,10 @@ private void initCluster() {
369366
TimeUnit.MINUTES);
370367
}
371368

372-
static PDClient createRaw(TiConfiguration conf, ChannelFactory channelFactory) {
369+
static PDClient createRaw(TiSession session) {
373370
PDClient client = null;
374371
try {
375-
client = new PDClient(conf, channelFactory);
372+
client = new PDClient(session);
376373
client.initCluster();
377374
} catch (Exception e) {
378375
if (client != null) {

src/main/java/org/tikv/common/ReadOnlyPDClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,7 @@ public interface ReadOnlyPDClient {
6060
Store getStore(BackOffer backOffer, long storeId);
6161

6262
Future<Store> getStoreAsync(BackOffer backOffer, long storeId);
63+
64+
/** Get associated session * @return the session associated to client */
65+
TiSession getSession();
6366
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright 2017 PingCAP, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package org.tikv.common;
17+
18+
import static org.tikv.common.util.KeyRangeUtils.makeRange;
19+
20+
import com.google.common.collect.Range;
21+
import com.google.protobuf.ByteString;
22+
import java.util.ArrayList;
23+
import java.util.Iterator;
24+
import java.util.List;
25+
import org.tikv.common.exception.TiClientInternalException;
26+
import org.tikv.common.key.Key;
27+
import org.tikv.common.meta.TiTimestamp;
28+
import org.tikv.common.operation.iterator.ConcreteScanIterator;
29+
import org.tikv.common.region.RegionStoreClient;
30+
import org.tikv.common.region.TiRegion;
31+
import org.tikv.common.util.BackOffer;
32+
import org.tikv.common.util.ConcreteBackOffer;
33+
import org.tikv.common.util.Pair;
34+
import org.tikv.kvproto.Kvrpcpb.KvPair;
35+
import org.tikv.kvproto.Metapb.Store;
36+
37+
public class Snapshot {
38+
private final TiTimestamp timestamp;
39+
private final TiSession session;
40+
private final TiConfiguration conf;
41+
42+
public Snapshot(TiTimestamp timestamp, TiSession session) {
43+
this.timestamp = timestamp;
44+
this.session = session;
45+
this.conf = session.getConf();
46+
}
47+
48+
public TiSession getSession() {
49+
return session;
50+
}
51+
52+
public long getVersion() {
53+
return timestamp.getVersion();
54+
}
55+
56+
public TiTimestamp getTimestamp() {
57+
return timestamp;
58+
}
59+
60+
public byte[] get(byte[] key) {
61+
ByteString keyString = ByteString.copyFrom(key);
62+
ByteString value = get(keyString);
63+
return value.toByteArray();
64+
}
65+
66+
public ByteString get(ByteString key) {
67+
Pair<TiRegion, Store> pair = session.getRegionManager().getRegionStorePairByKey(key);
68+
RegionStoreClient client = RegionStoreClient.create(pair.first, pair.second, getSession());
69+
// TODO: Need to deal with lock error after grpc stable
70+
return client.get(ConcreteBackOffer.newGetBackOff(), key, timestamp.getVersion());
71+
}
72+
73+
public Iterator<KvPair> scan(ByteString startKey) {
74+
return new ConcreteScanIterator(startKey, session, timestamp.getVersion());
75+
}
76+
77+
// TODO: Need faster implementation, say concurrent version
78+
// Assume keys sorted
79+
public List<KvPair> batchGet(List<ByteString> keys) {
80+
TiRegion curRegion = null;
81+
Range<Key> curKeyRange = null;
82+
Pair<TiRegion, Store> lastPair;
83+
List<ByteString> keyBuffer = new ArrayList<>();
84+
List<KvPair> result = new ArrayList<>(keys.size());
85+
BackOffer backOffer = ConcreteBackOffer.newBatchGetMaxBackOff();
86+
for (ByteString key : keys) {
87+
if (curRegion == null || !curKeyRange.contains(Key.toRawKey(key))) {
88+
Pair<TiRegion, Store> pair = session.getRegionManager().getRegionStorePairByKey(key);
89+
lastPair = pair;
90+
curRegion = pair.first;
91+
curKeyRange = makeRange(curRegion.getStartKey(), curRegion.getEndKey());
92+
93+
try (RegionStoreClient client =
94+
RegionStoreClient.create(lastPair.first, lastPair.second, getSession())) {
95+
List<KvPair> partialResult =
96+
client.batchGet(backOffer, keyBuffer, timestamp.getVersion());
97+
// TODO: Add lock check
98+
result.addAll(partialResult);
99+
} catch (Exception e) {
100+
throw new TiClientInternalException("Error Closing Store client.", e);
101+
}
102+
keyBuffer = new ArrayList<>();
103+
keyBuffer.add(key);
104+
}
105+
}
106+
return result;
107+
}
108+
}

src/main/java/org/tikv/common/TiConfiguration.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class TiConfiguration implements Serializable {
4444
private static final IsolationLevel DEF_ISOLATION_LEVEL = IsolationLevel.RC;
4545
private static final boolean DEF_SHOW_ROWID = false;
4646
private static final String DEF_DB_PREFIX = "";
47-
private static final KVMode DEF_KV_MODE = KVMode.TXN;
47+
private static final String DEF_KV_MODE = "KV";
4848
private static final int DEF_RAW_CLIENT_CONCURRENCY = 200;
4949

5050
private int timeout = DEF_TIMEOUT;
@@ -63,14 +63,9 @@ public class TiConfiguration implements Serializable {
6363
private int maxRequestKeyRangeSize = MAX_REQUEST_KEY_RANGE_SIZE;
6464
private boolean showRowId = DEF_SHOW_ROWID;
6565
private String dbPrefix = DEF_DB_PREFIX;
66-
private KVMode kvMode = DEF_KV_MODE;
66+
private String kvMode = DEF_KV_MODE;
6767
private int rawClientConcurrency = DEF_RAW_CLIENT_CONCURRENCY;
6868

69-
public enum KVMode {
70-
TXN,
71-
RAW
72-
}
73-
7469
public static TiConfiguration createDefault(String pdAddrsStr) {
7570
Objects.requireNonNull(pdAddrsStr, "pdAddrsStr is null");
7671
TiConfiguration conf = new TiConfiguration();
@@ -82,7 +77,7 @@ public static TiConfiguration createRawDefault(String pdAddrsStr) {
8277
Objects.requireNonNull(pdAddrsStr, "pdAddrsStr is null");
8378
TiConfiguration conf = new TiConfiguration();
8479
conf.pdAddrs = strToHostAndPort(pdAddrsStr);
85-
conf.kvMode = KVMode.RAW;
80+
conf.kvMode = "RAW";
8681
return conf;
8782
}
8883

@@ -234,12 +229,12 @@ public void setDBPrefix(String dbPrefix) {
234229
this.dbPrefix = dbPrefix;
235230
}
236231

237-
public KVMode getKvMode() {
232+
public String getKvMode() {
238233
return kvMode;
239234
}
240235

241236
public void setKvMode(String kvMode) {
242-
this.kvMode = KVMode.valueOf(kvMode);
237+
this.kvMode = kvMode;
243238
}
244239

245240
public int getRawClientConcurrency() {

0 commit comments

Comments
 (0)