Skip to content

Commit c8d856d

Browse files
iosmanthusti-srebot
authored andcommitted
cherry pick tikv#571 to release-3.1
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
1 parent cf14239 commit c8d856d

6 files changed

Lines changed: 524 additions & 0 deletions

File tree

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright 2022 TiKV Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package org.tikv.common;
19+
20+
import com.google.common.collect.ImmutableList;
21+
import com.google.protobuf.ByteString;
22+
import java.io.IOException;
23+
import java.net.ServerSocket;
24+
import java.util.ArrayList;
25+
import java.util.List;
26+
import java.util.stream.Collectors;
27+
import org.junit.After;
28+
import org.junit.Before;
29+
import org.tikv.common.region.TiRegion;
30+
import org.tikv.common.region.TiStore;
31+
import org.tikv.kvproto.Metapb;
32+
import org.tikv.kvproto.Pdpb;
33+
34+
public class MockThreeStoresTest extends PDMockServerTest {
35+
36+
protected TiRegion region;
37+
protected List<KVMockServer> servers = new ArrayList<>();
38+
protected List<Metapb.Store> stores;
39+
40+
@Before
41+
@Override
42+
public void setup() throws IOException {
43+
super.setup();
44+
45+
int[] ports = new int[3];
46+
for (int i = 0; i < ports.length; i++) {
47+
try (ServerSocket s = new ServerSocket(0)) {
48+
ports[i] = s.getLocalPort();
49+
}
50+
}
51+
52+
ImmutableList<Metapb.Peer> peers =
53+
ImmutableList.of(
54+
Metapb.Peer.newBuilder().setId(0x1).setStoreId(0x1).build(),
55+
Metapb.Peer.newBuilder().setId(0x2).setStoreId(0x2).build(),
56+
Metapb.Peer.newBuilder().setId(0x3).setStoreId(0x3).build());
57+
58+
Metapb.Region region =
59+
Metapb.Region.newBuilder()
60+
.setRegionEpoch(Metapb.RegionEpoch.newBuilder().setConfVer(1).setVersion(2))
61+
.setId(0xff)
62+
.setStartKey(ByteString.EMPTY)
63+
.setEndKey(ByteString.EMPTY)
64+
.addAllPeers(peers)
65+
.build();
66+
67+
stores =
68+
ImmutableList.of(
69+
Metapb.Store.newBuilder()
70+
.setAddress("127.0.0.1:" + ports[0])
71+
.setVersion("5.0.0")
72+
.setId(0x1)
73+
.build(),
74+
Metapb.Store.newBuilder()
75+
.setAddress("127.0.0.1:" + ports[1])
76+
.setVersion("5.0.0")
77+
.setId(0x2)
78+
.build(),
79+
Metapb.Store.newBuilder()
80+
.setAddress("127.0.0.1:" + ports[2])
81+
.setVersion("5.0.0")
82+
.setId(0x3)
83+
.build());
84+
85+
for (PDMockServer server : pdServers) {
86+
server.addGetRegionListener(
87+
request ->
88+
Pdpb.GetRegionResponse.newBuilder()
89+
.setLeader(peers.get(0))
90+
.setRegion(region)
91+
.build());
92+
server.addGetStoreListener(
93+
(request) -> {
94+
int i = (int) request.getStoreId() - 1;
95+
return Pdpb.GetStoreResponse.newBuilder().setStore(stores.get(i)).build();
96+
});
97+
}
98+
99+
this.region =
100+
new TiRegion(
101+
session.getConf(),
102+
region,
103+
region.getPeers(0),
104+
region.getPeersList(),
105+
stores.stream().map(TiStore::new).collect(Collectors.toList()));
106+
for (int port : ports) {
107+
KVMockServer server = new KVMockServer();
108+
server.start(this.region, port);
109+
servers.add(server);
110+
}
111+
}
112+
113+
public void put(ByteString key, ByteString value) {
114+
for (KVMockServer server : servers) {
115+
server.put(key, value);
116+
}
117+
}
118+
119+
public void remove(ByteString key, ByteString value) {
120+
for (KVMockServer server : servers) {
121+
server.remove(key);
122+
}
123+
}
124+
125+
@After
126+
public void tearDown() {
127+
for (KVMockServer server : servers) {
128+
server.stop();
129+
}
130+
}
131+
}
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
/*
2+
* Copyright 2017 TiKV Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package org.tikv.common;
19+
20+
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertTrue;
22+
import static org.junit.Assert.fail;
23+
24+
import com.google.protobuf.ByteString;
25+
import java.util.concurrent.Callable;
26+
import java.util.concurrent.ExecutorService;
27+
import java.util.concurrent.Executors;
28+
import java.util.concurrent.Future;
29+
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.TimeoutException;
31+
import java.util.concurrent.atomic.AtomicInteger;
32+
import org.junit.Test;
33+
import org.tikv.common.exception.GrpcException;
34+
import org.tikv.common.meta.TiTimestamp;
35+
import org.tikv.common.util.BackOffer;
36+
import org.tikv.common.util.ConcreteBackOffer;
37+
import org.tikv.common.util.Pair;
38+
import org.tikv.kvproto.Metapb;
39+
import org.tikv.kvproto.Metapb.Store;
40+
import org.tikv.kvproto.Metapb.StoreState;
41+
42+
public class PDClientMockTest extends PDMockServerTest {
43+
44+
private static final String LOCAL_ADDR_IPV6 = "[::1]";
45+
public static final String HTTP = "http://";
46+
47+
@Test
48+
public void testCreate() throws Exception {
49+
try (PDClient client = session.getPDClient()) {
50+
assertEquals(
51+
LOCAL_ADDR + ":" + leader.getPort(), client.getPdClientWrapper().getLeaderInfo());
52+
assertEquals(CLUSTER_ID, client.getHeader().getClusterId());
53+
}
54+
}
55+
56+
@Test
57+
public void testSwitchLeader() throws Exception {
58+
try (PDClient client = session.getPDClient()) {
59+
// Switch leader to server 1
60+
client.trySwitchLeader(HTTP + LOCAL_ADDR + ":" + pdServers.get(1).getPort());
61+
assertEquals(
62+
client.getPdClientWrapper().getLeaderInfo(),
63+
HTTP + LOCAL_ADDR + ":" + pdServers.get(1).getPort());
64+
}
65+
tearDown();
66+
setup(LOCAL_ADDR_IPV6);
67+
try (PDClient client = session.getPDClient()) {
68+
client.trySwitchLeader(HTTP + LOCAL_ADDR_IPV6 + ":" + pdServers.get(2).getPort());
69+
assertEquals(
70+
client.getPdClientWrapper().getLeaderInfo(),
71+
HTTP + LOCAL_ADDR_IPV6 + ":" + pdServers.get(2).getPort());
72+
}
73+
}
74+
75+
@Test
76+
public void testTso() throws Exception {
77+
try (PDClient client = session.getPDClient()) {
78+
TiTimestamp ts = client.getTimestamp(defaultBackOff());
79+
// Test pdServer is set to generate physical == logical + 1
80+
assertEquals(ts.getPhysical(), ts.getLogical() + 1);
81+
}
82+
}
83+
84+
@Test
85+
public void testGetRegionByKey() throws Exception {
86+
byte[] startKey = new byte[] {1, 0, 2, 4};
87+
byte[] endKey = new byte[] {1, 0, 2, 5};
88+
int confVer = 1026;
89+
int ver = 1027;
90+
leader.addGetRegionListener(
91+
request ->
92+
GrpcUtils.makeGetRegionResponse(
93+
leader.getClusterId(),
94+
GrpcUtils.makeRegion(
95+
1,
96+
ByteString.copyFrom(startKey),
97+
ByteString.copyFrom(endKey),
98+
GrpcUtils.makeRegionEpoch(confVer, ver),
99+
GrpcUtils.makePeer(1, 10),
100+
GrpcUtils.makePeer(2, 20))));
101+
try (PDClient client = session.getPDClient()) {
102+
Pair<Metapb.Region, Metapb.Peer> rl =
103+
client.getRegionByKey(defaultBackOff(), ByteString.EMPTY);
104+
Metapb.Region r = rl.first;
105+
Metapb.Peer l = rl.second;
106+
assertEquals(r.getStartKey(), ByteString.copyFrom(startKey));
107+
assertEquals(r.getEndKey(), ByteString.copyFrom(endKey));
108+
assertEquals(r.getRegionEpoch().getConfVer(), confVer);
109+
assertEquals(r.getRegionEpoch().getVersion(), ver);
110+
assertEquals(1, l.getId());
111+
assertEquals(10, l.getStoreId());
112+
}
113+
}
114+
115+
@Test
116+
public void testGetRegionById() throws Exception {
117+
byte[] startKey = new byte[] {1, 0, 2, 4};
118+
byte[] endKey = new byte[] {1, 0, 2, 5};
119+
int confVer = 1026;
120+
int ver = 1027;
121+
122+
leader.addGetRegionByIDListener(
123+
request ->
124+
GrpcUtils.makeGetRegionResponse(
125+
leader.getClusterId(),
126+
GrpcUtils.makeRegion(
127+
1,
128+
ByteString.copyFrom(startKey),
129+
ByteString.copyFrom(endKey),
130+
GrpcUtils.makeRegionEpoch(confVer, ver),
131+
GrpcUtils.makePeer(1, 10),
132+
GrpcUtils.makePeer(2, 20))));
133+
try (PDClient client = session.getPDClient()) {
134+
Pair<Metapb.Region, Metapb.Peer> rl = client.getRegionByID(defaultBackOff(), 0);
135+
Metapb.Region r = rl.first;
136+
Metapb.Peer l = rl.second;
137+
assertEquals(ByteString.copyFrom(startKey), r.getStartKey());
138+
assertEquals(ByteString.copyFrom(endKey), r.getEndKey());
139+
assertEquals(confVer, r.getRegionEpoch().getConfVer());
140+
assertEquals(ver, r.getRegionEpoch().getVersion());
141+
assertEquals(1, l.getId());
142+
assertEquals(10, l.getStoreId());
143+
}
144+
}
145+
146+
@Test
147+
public void testGetStore() throws Exception {
148+
long storeId = 1;
149+
String testAddress = "testAddress";
150+
leader.addGetStoreListener(
151+
request ->
152+
GrpcUtils.makeGetStoreResponse(
153+
leader.getClusterId(),
154+
GrpcUtils.makeStore(
155+
storeId,
156+
testAddress,
157+
Metapb.StoreState.Up,
158+
GrpcUtils.makeStoreLabel("k1", "v1"),
159+
GrpcUtils.makeStoreLabel("k2", "v2"))));
160+
try (PDClient client = session.getPDClient()) {
161+
Store r = client.getStore(defaultBackOff(), storeId);
162+
assertEquals(storeId, r.getId());
163+
assertEquals(testAddress, r.getAddress());
164+
assertEquals(Metapb.StoreState.Up, r.getState());
165+
assertEquals("k1", r.getLabels(0).getKey());
166+
assertEquals("k2", r.getLabels(1).getKey());
167+
assertEquals("v1", r.getLabels(0).getValue());
168+
assertEquals("v2", r.getLabels(1).getValue());
169+
170+
leader.addGetStoreListener(
171+
request ->
172+
GrpcUtils.makeGetStoreResponse(
173+
leader.getClusterId(),
174+
GrpcUtils.makeStore(storeId, testAddress, Metapb.StoreState.Tombstone)));
175+
assertEquals(StoreState.Tombstone, client.getStore(defaultBackOff(), storeId).getState());
176+
}
177+
}
178+
179+
private BackOffer defaultBackOff() {
180+
return ConcreteBackOffer.newCustomBackOff(1000);
181+
}
182+
183+
@Test
184+
public void testRetryPolicy() throws Exception {
185+
long storeId = 1024;
186+
ExecutorService service = Executors.newCachedThreadPool();
187+
AtomicInteger i = new AtomicInteger();
188+
leader.addGetStoreListener(
189+
request -> {
190+
if (i.getAndIncrement() < 2) {
191+
return null;
192+
} else {
193+
return GrpcUtils.makeGetStoreResponse(
194+
leader.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up));
195+
}
196+
});
197+
try (PDClient client = session.getPDClient()) {
198+
Callable<Store> storeCallable =
199+
() -> client.getStore(ConcreteBackOffer.newCustomBackOff(5000), 0);
200+
Future<Store> storeFuture = service.submit(storeCallable);
201+
try {
202+
Store r = storeFuture.get(50, TimeUnit.SECONDS);
203+
assertEquals(r.getId(), storeId);
204+
} catch (TimeoutException e) {
205+
fail();
206+
}
207+
208+
// Should fail
209+
AtomicInteger j = new AtomicInteger();
210+
leader.addGetStoreListener(
211+
request -> {
212+
if (j.getAndIncrement() < 6) {
213+
return null;
214+
} else {
215+
return GrpcUtils.makeGetStoreResponse(
216+
leader.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up));
217+
}
218+
});
219+
220+
try {
221+
client.getStore(defaultBackOff(), 0);
222+
} catch (GrpcException e) {
223+
assertTrue(true);
224+
return;
225+
}
226+
fail();
227+
}
228+
}
229+
}

src/test/java/org/tikv/common/PDMockServer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,11 @@
2828
import org.tikv.kvproto.Pdpb.*;
2929

3030
public class PDMockServer extends PDGrpc.PDImplBase {
31+
<<<<<<< HEAD
3132
public int port;
33+
=======
34+
private int port;
35+
>>>>>>> f4e7c302a... [close #570] mockserver: fix unstable mock server cluster setup (#571)
3236
private long clusterId;
3337

3438
private Server server;
@@ -136,4 +140,8 @@ public void stop() {
136140
public long getClusterId() {
137141
return clusterId;
138142
}
143+
144+
public long getPort() {
145+
return port;
146+
}
139147
}

0 commit comments

Comments
 (0)