Skip to content
This repository was archived by the owner on Sep 28, 2022. It is now read-only.

Commit 7e16d5d

Browse files
committed
implements client-side rowkey translation on import; some cleanup
1 parent 4f922ff commit 7e16d5d

10 files changed

Lines changed: 3847 additions & 1972 deletions

File tree

com.pilosa.client/src/integration-test/java/integrationtest/PilosaClientIT.java

Lines changed: 114 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -546,7 +546,7 @@ public void importRowKeyColumnIDTest() throws IOException {
546546
LineDeserializer deserializer = new RowKeyColumnIDDeserializer();
547547
RecordIterator iterator = csvRecordIterator("row_key-column_id.csv", deserializer);
548548
FieldOptions fieldOptions = FieldOptions.builder()
549-
.keys(true)
549+
.setKeys(true)
550550
.build();
551551
Field field = this.index.field("importfield-rowkey-colid", fieldOptions);
552552
client.ensureField(field);
@@ -567,6 +567,37 @@ public void importRowKeyColumnIDTest() throws IOException {
567567
}
568568
}
569569

570+
@Test
571+
public void fastImportRowKeyColumnIDTest() throws IOException {
572+
try (PilosaClient client = this.getClient()) {
573+
LineDeserializer deserializer = new RowKeyColumnIDDeserializer();
574+
RecordIterator iterator = csvRecordIterator("row_key-column_id.csv", deserializer);
575+
FieldOptions fieldOptions = FieldOptions.builder()
576+
.setKeys(true)
577+
.build();
578+
Field field = this.index.field("importfield-rowkey-colid", fieldOptions);
579+
client.ensureField(field);
580+
ImportOptions importOptions = ImportOptions.builder()
581+
.setRoaring(true)
582+
.setTranslateKeys(true)
583+
.build();
584+
client.importField(field, iterator, importOptions);
585+
PqlBatchQuery bq = index.batchQuery(
586+
field.row("one"),
587+
field.row("five"),
588+
field.row("three")
589+
);
590+
QueryResponse response = client.query(bq);
591+
592+
List<Long> target = Arrays.asList(10L, 20L, 41L);
593+
List<QueryResult> results = response.getResults();
594+
for (int i = 0; i < results.size(); i++) {
595+
RowResult br = results.get(i).getRow();
596+
assertEquals(target.get(i), br.getColumns().get(0));
597+
}
598+
}
599+
}
600+
570601
@Test
571602
public void importRowKeyColumnKeyTest() throws IOException {
572603
try (PilosaClient client = this.getClient()) {
@@ -644,7 +675,7 @@ public void importRoaringTest() throws IOException {
644675
@Test
645676
public void importRoaringTimeFieldTest() throws IOException {
646677
try (PilosaClient client = this.getClient()) {
647-
RecordIterator iterator = StaticColumnIterator.columnsWithIDs();
678+
RecordIterator iterator = StaticColumnIteratorWithTimestamp.columnsWithIDs();
648679
FieldOptions fieldOptions = FieldOptions.builder()
649680
.fieldTime(TimeQuantum.YEAR_MONTH_DAY_HOUR)
650681
.build();
@@ -736,14 +767,14 @@ public void importFieldValuesWithKeysTest() throws IOException {
736767
RecordIterator iterator = StaticColumnIterator.fieldValuesWithKeys();
737768
FieldOptions options = FieldOptions.builder()
738769
.fieldInt(0, 100)
739-
.keys(true)
770+
.setKeys(true)
740771
.build();
741772
Field field = this.keyIndex.field("importvaluefieldkeys", options);
742773
client.ensureField(field);
743774
client.importField(field, iterator);
744775

745776
FieldOptions options2 = FieldOptions.builder()
746-
.keys(true)
777+
.setKeys(true)
747778
.build();
748779
Field field2 = this.keyIndex.field("importvaluefieldkeys-set", options2);
749780
client.ensureField(field2);
@@ -765,7 +796,6 @@ public void importTestWithBatch() throws IOException {
765796
Field field = this.index.field("importfield");
766797
client.ensureField(field);
767798
ImportOptions options = ImportOptions.builder().
768-
setStrategy(ImportOptions.Strategy.BATCH).
769799
setBatchSize(3).
770800
setThreadCount(1).
771801
build();
@@ -848,8 +878,6 @@ public void run() {
848878
ImportOptions options = ImportOptions.builder()
849879
.setBatchSize(100000)
850880
.setThreadCount(2)
851-
.setStrategy(ImportOptions.Strategy.TIMEOUT)
852-
.setTimeoutMs(5)
853881
.build();
854882
client.importField(field, iterator, options, statusQueue);
855883
monitorThread.interrupt();
@@ -873,7 +901,6 @@ public void run() {
873901
this.client.ensureField(field);
874902

875903
ImportOptions options = ImportOptions.builder()
876-
.setStrategy(ImportOptions.Strategy.BATCH)
877904
.setBatchSize(500)
878905
.setThreadCount(1)
879906
.build();
@@ -920,7 +947,6 @@ public void run() {
920947
this.client.ensureField(field);
921948

922949
ImportOptions options = ImportOptions.builder()
923-
.setStrategy(ImportOptions.Strategy.BATCH)
924950
.setBatchSize(1_000)
925951
.setThreadCount(1)
926952
.build();
@@ -1109,6 +1135,25 @@ public void warningResponseTest() throws IOException, InterruptedException {
11091135
}
11101136
}
11111137

1138+
@Test
1139+
public void translateRowKeysTest() throws IOException {
1140+
try (PilosaClient client = getClient()) {
1141+
FieldOptions options = FieldOptions.builder()
1142+
.setKeys(true)
1143+
.build();
1144+
Field field = this.index.field("translate-rowkey-field", options);
1145+
client.syncSchema(this.schema);
1146+
client.query(this.index.batchQuery(
1147+
field.set("key1", 10),
1148+
field.set("key2", 1000)
1149+
));
1150+
1151+
List<Long> rowIDs = client.translateKeys(field, Arrays.asList("key1", "key2"));
1152+
List<Long> target = Arrays.asList(1L, 2L);
1153+
assertEquals(target, rowIDs);
1154+
}
1155+
}
1156+
11121157
@Test(expected = PilosaException.class)
11131158
public void importFailNot200() throws IOException {
11141159
HttpServer server = runImportFailsHttpServer();
@@ -1556,9 +1601,6 @@ private PilosaClient getClient() {
15561601
String bindAddress = getBindAddress();
15571602
Cluster cluster = Cluster.withHost(URI.address(bindAddress));
15581603
ClientOptions.Builder optionsBuilder = ClientOptions.builder();
1559-
if (isLegacyModeOff()) {
1560-
optionsBuilder.setLegacyMode(false);
1561-
}
15621604
long shardWidth = getShardWidth();
15631605
if (shardWidth > 0) {
15641606
optionsBuilder.setShardWidth(shardWidth);
@@ -1574,11 +1616,6 @@ private String getBindAddress() {
15741616
return bindAddress;
15751617
}
15761618

1577-
private boolean isLegacyModeOff() {
1578-
String legacyModeOffStr = System.getenv("LEGACY_MODE_OFF");
1579-
return legacyModeOffStr != null && legacyModeOffStr.equals("true");
1580-
}
1581-
15821619
private long getShardWidth() {
15831620
String shardWidthStr = System.getenv("SHARD_WIDTH");
15841621
return (shardWidthStr == null) ? 0 : Long.parseLong(shardWidthStr);
@@ -1617,6 +1654,66 @@ public static StaticColumnIterator fieldValuesWithKeys() {
16171654
}
16181655

16191656
private StaticColumnIterator(boolean keys, boolean intValues) {
1657+
this.records = new ArrayList<>(3);
1658+
if (keys) {
1659+
if (intValues) {
1660+
this.records.add(FieldValue.create("ten", 7));
1661+
this.records.add(FieldValue.create("seven", 1));
1662+
} else {
1663+
this.records.add(Column.create("ten", "five"));
1664+
this.records.add(Column.create("two", "three"));
1665+
this.records.add(Column.create("seven", "one"));
1666+
1667+
}
1668+
} else {
1669+
if (intValues) {
1670+
this.records.add(FieldValue.create(10, 7));
1671+
this.records.add(FieldValue.create(7, 1));
1672+
} else {
1673+
this.records.add(Column.create(10, 5));
1674+
this.records.add(Column.create(2, 3));
1675+
this.records.add(Column.create(7, 1));
1676+
}
1677+
}
1678+
}
1679+
1680+
@Override
1681+
public boolean hasNext() {
1682+
return this.index < this.records.size();
1683+
}
1684+
1685+
@Override
1686+
public Record next() {
1687+
return this.records.get(index++);
1688+
}
1689+
1690+
@Override
1691+
public void remove() {
1692+
// We have this just to avoid compilation problems on JDK 7
1693+
}
1694+
}
1695+
1696+
class StaticColumnIteratorWithTimestamp implements RecordIterator {
1697+
private List<Record> records;
1698+
private int index = 0;
1699+
1700+
public static StaticColumnIteratorWithTimestamp columnsWithIDs() {
1701+
return new StaticColumnIteratorWithTimestamp(false, false);
1702+
}
1703+
1704+
public static StaticColumnIteratorWithTimestamp columnsWithKeys() {
1705+
return new StaticColumnIteratorWithTimestamp(true, false);
1706+
}
1707+
1708+
public static StaticColumnIteratorWithTimestamp fieldValuesWithIDs() {
1709+
return new StaticColumnIteratorWithTimestamp(false, true);
1710+
}
1711+
1712+
public static StaticColumnIteratorWithTimestamp fieldValuesWithKeys() {
1713+
return new StaticColumnIteratorWithTimestamp(true, true);
1714+
}
1715+
1716+
private StaticColumnIteratorWithTimestamp(boolean keys, boolean intValues) {
16201717
this.records = new ArrayList<>(3);
16211718
if (keys) {
16221719
if (intValues) {

com.pilosa.client/src/internal/public.proto

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
syntax = "proto3";
22

3+
package internal;
4+
35
option java_package = "com.pilosa.client";
46
option java_outer_classname = "Internal";
57

@@ -107,6 +109,16 @@ message ImportValueRequest {
107109
repeated int64 Values = 6;
108110
}
109111

112+
message TranslateKeysRequest {
113+
string Index = 1;
114+
string Field = 2;
115+
repeated string Keys = 3;
116+
}
117+
118+
message TranslateKeysResponse {
119+
repeated uint64 IDs = 3;
120+
}
121+
110122
message ImportRoaringRequestView {
111123
string Name = 1;
112124
bytes Data = 2;

com.pilosa.client/src/main/java/com/pilosa/client/ClientOptions.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -121,17 +121,6 @@ public Builder setSslContext(SSLContext sslContext) {
121121
return this;
122122
}
123123

124-
public Builder setSkipVersionCheck() {
125-
this.skipVersionCheck = true;
126-
return this;
127-
}
128-
129-
public Builder setLegacyMode(boolean enable) {
130-
this.legacyMode = enable;
131-
this.skipVersionCheck = true;
132-
return this;
133-
}
134-
135124
public Builder setImportThreadCount(int threadCount) {
136125
this.importThreadCount = threadCount;
137126
return this;
@@ -158,8 +147,6 @@ public ClientOptions build() {
158147
private int connectionPoolSizePerRoute = 10;
159148
private int connectionPoolTotalSize = 100;
160149
private SSLContext sslContext = SSLContexts.createDefault();
161-
private boolean skipVersionCheck = false;
162-
private boolean legacyMode = false;
163150
private int importThreadCount = Runtime.getRuntime().availableProcessors();
164151
private long shardWidth = DEFAULT_SHARD_WIDTH;
165152
}

com.pilosa.client/src/main/java/com/pilosa/client/ImportOptions.java

Lines changed: 20 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -47,31 +47,24 @@ private Builder() {
4747
}
4848

4949
public ImportOptions build() {
50-
return new ImportOptions(this.threadCount,
51-
this.timeoutMs, this.batchSize, this.strategy,
52-
this.roaring, this.clear);
50+
return new ImportOptions(
51+
this.threadCount,
52+
this.batchSize,
53+
this.roaring,
54+
this.clear,
55+
this.translateKeys);
5356
}
5457

5558
public Builder setThreadCount(int threadCount) {
5659
this.threadCount = threadCount;
5760
return this;
5861
}
5962

60-
public Builder setTimeoutMs(long timeoutMs) {
61-
this.timeoutMs = timeoutMs;
62-
return this;
63-
}
64-
6563
public Builder setBatchSize(int batchSize) {
6664
this.batchSize = batchSize;
6765
return this;
6866
}
6967

70-
public Builder setStrategy(Strategy strategy) {
71-
this.strategy = (strategy == Strategy.DEFAULT) ? Strategy.BATCH : strategy;
72-
return this;
73-
}
74-
7568
public Builder setRoaring(boolean roaring) {
7669
this.roaring = roaring;
7770
return this;
@@ -82,26 +75,28 @@ public Builder setClear(boolean clear) {
8275
return this;
8376
}
8477

78+
public Builder setTranslateKeys(boolean translateKeys) {
79+
this.translateKeys = translateKeys;
80+
return this;
81+
}
82+
8583
private int threadCount = 1;
86-
private long timeoutMs = 100;
8784
private int batchSize = 100000;
88-
private Strategy strategy = Strategy.BATCH;
8985
private boolean roaring = false;
9086
private boolean clear = false;
87+
private boolean translateKeys = false;
9188
}
9289

9390
private ImportOptions(int threadCount,
94-
long timeoutMs,
9591
int batchSize,
96-
Strategy strategy,
9792
boolean roaring,
98-
boolean clear) {
93+
boolean clear,
94+
boolean translateKeys) {
9995
this.threadCount = threadCount;
100-
this.timeoutMs = timeoutMs;
10196
this.batchSize = batchSize;
102-
this.strategy = strategy;
10397
this.roaring = roaring;
10498
this.clear = clear;
99+
this.translateKeys = translateKeys;
105100
}
106101

107102
public static Builder builder() {
@@ -112,18 +107,10 @@ public int getThreadCount() {
112107
return this.threadCount;
113108
}
114109

115-
public long getTimeoutMs() {
116-
return this.timeoutMs;
117-
}
118-
119110
public int getBatchSize() {
120111
return this.batchSize;
121112
}
122113

123-
public Strategy getStrategy() {
124-
return this.strategy;
125-
}
126-
127114
public long getShardWidth() {
128115
return ClientOptions.DEFAULT_SHARD_WIDTH;
129116
}
@@ -136,10 +123,13 @@ public boolean isClear() {
136123
return this.clear;
137124
}
138125

126+
public boolean isTranslateKeys() {
127+
return this.translateKeys;
128+
}
129+
139130
final private int threadCount;
140-
final private long timeoutMs;
141131
final private int batchSize;
142-
final private Strategy strategy;
143132
final private boolean roaring;
144133
final private boolean clear;
134+
final private boolean translateKeys;
145135
}

0 commit comments

Comments
 (0)