Skip to content

Commit ab37cc1

Browse files
authored
Pipe: Fixed the bug that sorter may use wrong value when there are duplicated timestamps & Refactor (#15488) (#15500)
1 parent a601933 commit ab37cc1

2 files changed

Lines changed: 104 additions & 70 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/PipeTabletEventSorter.java

Lines changed: 96 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,15 @@ public class PipeTabletEventSorter {
3535
private final Tablet tablet;
3636

3737
private boolean isSorted = true;
38-
private boolean isDeduplicated = true;
38+
private boolean isDeDuplicated = true;
3939

4040
private Integer[] index;
41-
private int deduplicatedSize;
41+
private int[] deDuplicatedIndex;
42+
private int deDuplicatedSize;
4243

4344
public PipeTabletEventSorter(final Tablet tablet) {
4445
this.tablet = tablet;
45-
deduplicatedSize = tablet == null ? 0 : tablet.rowSize;
46+
deDuplicatedSize = tablet == null ? 0 : tablet.rowSize;
4647
}
4748

4849
public void deduplicateAndSortTimestampsIfNecessary() {
@@ -58,19 +59,20 @@ public void deduplicateAndSortTimestampsIfNecessary() {
5859
isSorted = false;
5960
}
6061
if (currentTimestamp == previousTimestamp) {
61-
isDeduplicated = false;
62+
isDeDuplicated = false;
6263
}
6364

64-
if (!isSorted && !isDeduplicated) {
65+
if (!isSorted && !isDeDuplicated) {
6566
break;
6667
}
6768
}
6869

69-
if (isSorted && isDeduplicated) {
70+
if (isSorted && isDeDuplicated) {
7071
return;
7172
}
7273

7374
index = new Integer[tablet.rowSize];
75+
deDuplicatedIndex = new int[tablet.rowSize];
7476
for (int i = 0, size = tablet.rowSize; i < size; i++) {
7577
index[i] = i;
7678
}
@@ -79,124 +81,160 @@ public void deduplicateAndSortTimestampsIfNecessary() {
7981
sortTimestamps();
8082

8183
// Do deduplicate anyway.
82-
// isDeduplicated may be false positive when isSorted is false.
84+
// isDeDuplicated may be false positive when isSorted is false.
8385
deduplicateTimestamps();
84-
isDeduplicated = true;
86+
isDeDuplicated = true;
8587
}
8688

87-
if (!isDeduplicated) {
89+
if (!isDeDuplicated) {
8890
deduplicateTimestamps();
8991
}
9092

91-
sortAndDeduplicateValuesAndBitMaps();
93+
sortAndMayDeduplicateValuesAndBitMaps();
9294
}
9395

9496
private void sortTimestamps() {
97+
// Index is sorted stably because it is Integer[]
9598
Arrays.sort(index, Comparator.comparingLong(i -> tablet.timestamps[i]));
9699
Arrays.sort(tablet.timestamps, 0, tablet.rowSize);
97100
}
98101

99102
private void deduplicateTimestamps() {
100-
deduplicatedSize = 1;
103+
deDuplicatedSize = 0;
104+
long[] timestamps = tablet.timestamps;
101105
for (int i = 1, size = tablet.rowSize; i < size; i++) {
102-
if (tablet.timestamps[i] != tablet.timestamps[i - 1]) {
103-
index[deduplicatedSize] = index[i];
104-
tablet.timestamps[deduplicatedSize] = tablet.timestamps[i];
106+
if (timestamps[i] != timestamps[i - 1]) {
107+
deDuplicatedIndex[deDuplicatedSize] = i - 1;
108+
timestamps[deDuplicatedSize] = timestamps[i - 1];
105109

106-
++deduplicatedSize;
110+
++deDuplicatedSize;
107111
}
108112
}
109-
tablet.rowSize = deduplicatedSize;
113+
114+
deDuplicatedIndex[deDuplicatedSize] = tablet.rowSize - 1;
115+
timestamps[deDuplicatedSize] = timestamps[tablet.rowSize - 1];
116+
tablet.rowSize = ++deDuplicatedSize;
110117
}
111118

112-
private void sortAndDeduplicateValuesAndBitMaps() {
119+
// Input:
120+
// Col: [1, null, 3, 6, null]
121+
// Timestamp: [2, 1, 1, 1, 1]
122+
// Intermediate:
123+
// Index: [1, 2, 3, 4, 0]
124+
// SortedTimestamp: [1, 2]
125+
// DeduplicateIndex: [3, 4]
126+
// Output:
127+
// (Used index: [2(3), 4(0)])
128+
// Col: [6, 1]
129+
private void sortAndMayDeduplicateValuesAndBitMaps() {
113130
int columnIndex = 0;
114131
for (int i = 0, size = tablet.getSchemas().size(); i < size; i++) {
115132
final IMeasurementSchema schema = tablet.getSchemas().get(i);
116133
if (schema != null) {
134+
BitMap deDuplicatedBitMap = null;
135+
BitMap originalBitMap = null;
136+
if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) {
137+
originalBitMap = tablet.bitMaps[columnIndex];
138+
deDuplicatedBitMap = new BitMap(originalBitMap.getSize());
139+
}
140+
117141
tablet.values[columnIndex] =
118-
reorderValueList(deduplicatedSize, tablet.values[columnIndex], schema.getType(), index);
142+
reorderValueListAndBitMap(
143+
tablet.values[columnIndex], schema.getType(), originalBitMap, deDuplicatedBitMap);
144+
119145
if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) {
120-
tablet.bitMaps[columnIndex] =
121-
reorderBitMap(deduplicatedSize, tablet.bitMaps[columnIndex], index);
146+
tablet.bitMaps[columnIndex] = deDuplicatedBitMap;
122147
}
123148
columnIndex++;
124149
}
125150
}
126151
}
127152

128-
private static Object reorderValueList(
129-
int deduplicatedSize,
153+
private Object reorderValueListAndBitMap(
130154
final Object valueList,
131155
final TSDataType dataType,
132-
final Integer[] index) {
156+
final BitMap originalBitMap,
157+
final BitMap deDuplicatedBitMap) {
133158
switch (dataType) {
134159
case BOOLEAN:
135160
final boolean[] boolValues = (boolean[]) valueList;
136-
final boolean[] deduplicatedBoolValues = new boolean[boolValues.length];
137-
for (int i = 0; i < deduplicatedSize; i++) {
138-
deduplicatedBoolValues[i] = boolValues[index[i]];
161+
final boolean[] deDuplicatedBoolValues = new boolean[boolValues.length];
162+
for (int i = 0; i < deDuplicatedSize; i++) {
163+
deDuplicatedBoolValues[i] =
164+
boolValues[getLastNonnullIndex(i, originalBitMap, deDuplicatedBitMap)];
139165
}
140-
return deduplicatedBoolValues;
166+
return deDuplicatedBoolValues;
141167
case INT32:
142168
final int[] intValues = (int[]) valueList;
143-
final int[] deduplicatedIntValues = new int[intValues.length];
144-
for (int i = 0; i < deduplicatedSize; i++) {
145-
deduplicatedIntValues[i] = intValues[index[i]];
169+
final int[] deDuplicatedIntValues = new int[intValues.length];
170+
for (int i = 0; i < deDuplicatedSize; i++) {
171+
deDuplicatedIntValues[i] =
172+
intValues[getLastNonnullIndex(i, originalBitMap, deDuplicatedBitMap)];
146173
}
147-
return deduplicatedIntValues;
174+
return deDuplicatedIntValues;
148175
case DATE:
149176
final LocalDate[] dateValues = (LocalDate[]) valueList;
150-
final LocalDate[] deduplicatedDateValues = new LocalDate[dateValues.length];
151-
for (int i = 0; i < deduplicatedSize; i++) {
152-
deduplicatedDateValues[i] = dateValues[index[i]];
177+
final LocalDate[] deDuplicatedDateValues = new LocalDate[dateValues.length];
178+
for (int i = 0; i < deDuplicatedSize; i++) {
179+
deDuplicatedDateValues[i] =
180+
dateValues[getLastNonnullIndex(i, originalBitMap, deDuplicatedBitMap)];
153181
}
154-
return deduplicatedDateValues;
182+
return deDuplicatedDateValues;
155183
case INT64:
156184
case TIMESTAMP:
157185
final long[] longValues = (long[]) valueList;
158-
final long[] deduplicatedLongValues = new long[longValues.length];
159-
for (int i = 0; i < deduplicatedSize; i++) {
160-
deduplicatedLongValues[i] = longValues[index[i]];
186+
final long[] deDuplicatedLongValues = new long[longValues.length];
187+
for (int i = 0; i < deDuplicatedSize; i++) {
188+
deDuplicatedLongValues[i] =
189+
longValues[getLastNonnullIndex(i, originalBitMap, deDuplicatedBitMap)];
161190
}
162-
return deduplicatedLongValues;
191+
return deDuplicatedLongValues;
163192
case FLOAT:
164193
final float[] floatValues = (float[]) valueList;
165-
final float[] deduplicatedFloatValues = new float[floatValues.length];
166-
for (int i = 0; i < deduplicatedSize; i++) {
167-
deduplicatedFloatValues[i] = floatValues[index[i]];
194+
final float[] deDuplicatedFloatValues = new float[floatValues.length];
195+
for (int i = 0; i < deDuplicatedSize; i++) {
196+
deDuplicatedFloatValues[i] =
197+
floatValues[getLastNonnullIndex(i, originalBitMap, deDuplicatedBitMap)];
168198
}
169-
return deduplicatedFloatValues;
199+
return deDuplicatedFloatValues;
170200
case DOUBLE:
171201
final double[] doubleValues = (double[]) valueList;
172-
final double[] deduplicatedDoubleValues = new double[doubleValues.length];
173-
for (int i = 0; i < deduplicatedSize; i++) {
174-
deduplicatedDoubleValues[i] = doubleValues[index[i]];
202+
final double[] deDuplicatedDoubleValues = new double[doubleValues.length];
203+
for (int i = 0; i < deDuplicatedSize; i++) {
204+
deDuplicatedDoubleValues[i] =
205+
doubleValues[getLastNonnullIndex(i, originalBitMap, deDuplicatedBitMap)];
175206
}
176-
return deduplicatedDoubleValues;
207+
return deDuplicatedDoubleValues;
177208
case TEXT:
178209
case BLOB:
179210
case STRING:
180211
final Binary[] binaryValues = (Binary[]) valueList;
181-
final Binary[] deduplicatedBinaryValues = new Binary[binaryValues.length];
182-
for (int i = 0; i < deduplicatedSize; i++) {
183-
deduplicatedBinaryValues[i] = binaryValues[index[i]];
212+
final Binary[] deDuplicatedBinaryValues = new Binary[binaryValues.length];
213+
for (int i = 0; i < deDuplicatedSize; i++) {
214+
deDuplicatedBinaryValues[i] =
215+
binaryValues[getLastNonnullIndex(i, originalBitMap, deDuplicatedBitMap)];
184216
}
185-
return deduplicatedBinaryValues;
217+
return deDuplicatedBinaryValues;
186218
default:
187219
throw new UnSupportedDataTypeException(
188220
String.format("Data type %s is not supported.", dataType));
189221
}
190222
}
191223

192-
private static BitMap reorderBitMap(
193-
int deduplicatedSize, final BitMap bitMap, final Integer[] index) {
194-
final BitMap deduplicatedBitMap = new BitMap(bitMap.getSize());
195-
for (int i = 0; i < deduplicatedSize; i++) {
196-
if (bitMap.isMarked(index[i])) {
197-
deduplicatedBitMap.mark(i);
224+
private int getLastNonnullIndex(
225+
final int i, final BitMap originalBitMap, final BitMap deDuplicatedBitMap) {
226+
if (originalBitMap == null) {
227+
return index[deDuplicatedIndex[i]];
228+
}
229+
int lastNonnullIndex = deDuplicatedIndex[i];
230+
final int lastIndex = i > 0 ? deDuplicatedIndex[i - 1] : -1;
231+
while (originalBitMap.isMarked(index[lastNonnullIndex])) {
232+
--lastNonnullIndex;
233+
if (lastNonnullIndex == lastIndex) {
234+
deDuplicatedBitMap.mark(i);
235+
return index[lastNonnullIndex + 1];
198236
}
199237
}
200-
return deduplicatedBitMap;
238+
return index[lastNonnullIndex];
201239
}
202240
}

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeTabletEventSorterTest.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,13 @@ public void testDeduplicate() {
112112

113113
long timestamp = 300;
114114
for (long i = 0; i < 10; i++) {
115-
int rowIndex = tablet.rowSize++;
115+
final int rowIndex = tablet.rowSize++;
116116
tablet.addTimestamp(rowIndex, timestamp);
117117
for (int s = 0; s < 3; s++) {
118-
tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, timestamp);
118+
tablet.addValue(
119+
schemaList.get(s).getMeasurementId(),
120+
rowIndex,
121+
(i + s) % 3 != 0 ? timestamp + i : null);
119122
}
120123
}
121124

@@ -133,16 +136,9 @@ public void testDeduplicate() {
133136
Assert.assertEquals(indices.size(), tablet.rowSize);
134137

135138
final long[] timestamps = Arrays.copyOfRange(tablet.timestamps, 0, tablet.rowSize);
136-
for (int i = 0; i < 3; ++i) {
137-
Assert.assertArrayEquals(
138-
timestamps, Arrays.copyOfRange((long[]) tablet.values[0], 0, tablet.rowSize));
139-
}
140-
141-
for (int i = 1; i < tablet.rowSize; ++i) {
142-
Assert.assertTrue(timestamps[i] > timestamps[i - 1]);
143-
for (int j = 0; j < 3; ++j) {
144-
Assert.assertTrue(((long[]) tablet.values[j])[i] > ((long[]) tablet.values[j])[i - 1]);
145-
}
139+
Assert.assertEquals(timestamps[0] + 8, ((long[]) tablet.values[0])[0]);
140+
for (int i = 1; i < 3; ++i) {
141+
Assert.assertEquals(timestamps[0] + 9, ((long[]) tablet.values[i])[0]);
146142
}
147143
}
148144

0 commit comments

Comments
 (0)