Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -242,4 +242,46 @@ public void deleteTimeSeriesMultiIntervalTest() {
fail(e.getMessage());
}
}

@Test
public void deleteTimeseriesAndCreateSameTypeTest2() throws Exception {
String[] retArray = new String[] {"1,4.0,", "2,8.0,"};
int cnt = 0;
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
statement.execute(
"create aligned timeseries root.turbine1.d1(s1 FLOAT encoding=PLAIN compression=SNAPPY, "
+ "s2 INT64 encoding=PLAIN compression=SNAPPY, s4 DOUBLE encoding=PLAIN compression=SNAPPY)");
statement.execute("INSERT INTO root.turbine1.d1(timestamp,s1,s2,s4) ALIGNED VALUES(1,1,2,4)");

try (ResultSet resultSet = statement.executeQuery("SELECT s4 FROM root.turbine1.d1")) {
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
while (resultSet.next()) {
StringBuilder builder = new StringBuilder();
for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
builder.append(resultSet.getString(i)).append(",");
}
Assert.assertEquals(retArray[cnt], builder.toString());
cnt++;
}
}
// delete series in the middle
statement.execute("DELETE timeseries root.turbine1.d1.s4");
statement.execute(
"INSERT INTO root.turbine1.d1(timestamp,s3,s4) ALIGNED VALUES(2,false,8.0)");
statement.execute("FLUSH");

try (ResultSet resultSet = statement.executeQuery("SELECT s4 FROM root.turbine1.d1")) {
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
while (resultSet.next()) {
StringBuilder builder = new StringBuilder();
for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
builder.append(resultSet.getString(i)).append(",");
}
Assert.assertEquals(retArray[cnt], builder.toString());
cnt++;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
public class AlignedWritableMemChunk extends AbstractWritableMemChunk {

private final Map<String, Integer> measurementIndexMap;
private final List<TSDataType> dataTypes;
private List<TSDataType> dataTypes;
private final List<IMeasurementSchema> schemaList;
private AlignedTVList list;
private List<AlignedTVList> sortedList;
Expand Down Expand Up @@ -197,7 +197,8 @@ protected void handoverAlignedTvList() {
list.sort();
}
sortedList.add(list);
this.list = AlignedTVList.newAlignedList(dataTypes);
this.list = AlignedTVList.newAlignedList(new ArrayList<>(dataTypes));
this.dataTypes = list.getTsDataTypes();
}

@Override
Expand Down Expand Up @@ -230,6 +231,107 @@ public void writeAlignedTablet(
}
}

/**
* Check metadata of columns and return array that mapping existed metadata to index of data
* column.
*
* @param schemaListInInsertPlan Contains all existed schema in InsertPlan. If some timeseries
* have been deleted, there will be null in its slot.
* @return columnIndexArray: schemaList[i] is schema of columns[columnIndexArray[i]]
*/
private Pair<Object[], BitMap[]> checkAndReorderColumnValuesInInsertPlan(
List<IMeasurementSchema> schemaListInInsertPlan, Object[] columnValues, BitMap[] bitMaps) {
Object[] reorderedColumnValues = new Object[schemaList.size()];
BitMap[] reorderedBitMaps = bitMaps == null ? null : new BitMap[schemaList.size()];
for (int i = 0; i < schemaListInInsertPlan.size(); i++) {
IMeasurementSchema measurementSchema = schemaListInInsertPlan.get(i);
if (measurementSchema != null) {
Integer index = this.measurementIndexMap.get(measurementSchema.getMeasurementName());
// Index is null means this measurement was not in this AlignedTVList before.
// We need to extend a new column in AlignedMemChunk and AlignedTVList.
// And the reorderedColumnValues should extend one more column for the new measurement
if (index == null) {
index = this.list.getTsDataTypes().size();
this.measurementIndexMap.put(schemaListInInsertPlan.get(i).getMeasurementName(), index);
this.schemaList.add(schemaListInInsertPlan.get(i));
this.list.extendColumn(schemaListInInsertPlan.get(i).getType());
reorderedColumnValues =
Arrays.copyOf(reorderedColumnValues, reorderedColumnValues.length + 1);
if (reorderedBitMaps != null) {
reorderedBitMaps = Arrays.copyOf(reorderedBitMaps, reorderedBitMaps.length + 1);
}
}
reorderedColumnValues[index] = columnValues[i];
if (bitMaps != null) {
reorderedBitMaps[index] = bitMaps[i];
}
}
}
return new Pair<>(reorderedColumnValues, reorderedBitMaps);
}

private void filterDeletedTimeStamp(
AlignedTVList alignedTVList,
List<List<TimeRange>> valueColumnsDeletionList,
boolean ignoreAllNullRows,
Map<Long, BitMap> timestampWithBitmap) {
BitMap allValueColDeletedMap = alignedTVList.getAllValueColDeletedMap();

int rowCount = alignedTVList.rowCount();
List<int[]> valueColumnDeleteCursor = new ArrayList<>();
if (valueColumnsDeletionList != null) {
valueColumnsDeletionList.forEach(x -> valueColumnDeleteCursor.add(new int[] {0}));
}

for (int row = 0; row < rowCount; row++) {
// the row is deleted
if (allValueColDeletedMap != null && allValueColDeletedMap.isMarked(row)) {
continue;
}
long timestamp = alignedTVList.getTime(row);

BitMap bitMap = new BitMap(schemaList.size());
for (int column = 0; column < schemaList.size(); column++) {
if (alignedTVList.isNullValue(alignedTVList.getValueIndex(row), column)) {
bitMap.mark(column);
}

// skip deleted row
if (valueColumnsDeletionList != null
&& !valueColumnsDeletionList.isEmpty()
&& isPointDeleted(
timestamp,
valueColumnsDeletionList.get(column),
valueColumnDeleteCursor.get(column))) {
bitMap.mark(column);
}

// skip all-null row
if (ignoreAllNullRows && bitMap.isAllMarked()) {
continue;
}
timestampWithBitmap.put(timestamp, bitMap);
}
}
}

public long[] getFilteredTimestamp(
List<List<TimeRange>> deletionList, List<BitMap> bitMaps, boolean ignoreAllNullRows) {
Map<Long, BitMap> timestampWithBitmap = new TreeMap<>();

filterDeletedTimeStamp(list, deletionList, ignoreAllNullRows, timestampWithBitmap);
for (AlignedTVList alignedTVList : sortedList) {
filterDeletedTimeStamp(alignedTVList, deletionList, ignoreAllNullRows, timestampWithBitmap);
}

List<Long> filteredTimestamps = new ArrayList<>();
for (Map.Entry<Long, BitMap> entry : timestampWithBitmap.entrySet()) {
filteredTimestamps.add(entry.getKey());
bitMaps.add(entry.getValue());
}
return filteredTimestamps.stream().mapToLong(Long::valueOf).toArray();
}

@Override
public AlignedTVList getWorkingTVList() {
return list;
Expand Down Expand Up @@ -785,112 +887,4 @@ public List<Integer> buildColumnIndexList(List<IMeasurementSchema> schemaList) {
}
return columnIndexList;
}

/**
* Check metadata of columns and return array that mapping existed metadata to index of data
* column.
*
* @param schemaListInInsertPlan Contains all existed schema in InsertPlan. If some timeseries
* have been deleted, there will be null in its slot.
* @return columnIndexArray: schemaList[i] is schema of columns[columnIndexArray[i]]
*/
private Pair<Object[], BitMap[]> checkAndReorderColumnValuesInInsertPlan(
List<IMeasurementSchema> schemaListInInsertPlan, Object[] columnValues, BitMap[] bitMaps) {
Object[] reorderedColumnValues = new Object[schemaList.size()];
BitMap[] reorderedBitMaps = bitMaps == null ? null : new BitMap[schemaList.size()];
for (int i = 0; i < schemaListInInsertPlan.size(); i++) {
IMeasurementSchema measurementSchema = schemaListInInsertPlan.get(i);
if (measurementSchema != null) {
Integer index = this.measurementIndexMap.get(measurementSchema.getMeasurementName());
// Index is null means this measurement was not in this AlignedTVList before.
// We need to extend a new column in AlignedMemChunk and AlignedTVList.
// And the reorderedColumnValues should extend one more column for the new measurement
if (index == null) {
index =
measurementIndexMap.isEmpty()
? 0
: measurementIndexMap.values().stream()
.mapToInt(Integer::intValue)
.max()
.getAsInt()
+ 1;
this.measurementIndexMap.put(schemaListInInsertPlan.get(i).getMeasurementName(), index);
this.schemaList.add(schemaListInInsertPlan.get(i));
this.list.extendColumn(schemaListInInsertPlan.get(i).getType());
reorderedColumnValues =
Arrays.copyOf(reorderedColumnValues, reorderedColumnValues.length + 1);
if (reorderedBitMaps != null) {
reorderedBitMaps = Arrays.copyOf(reorderedBitMaps, reorderedBitMaps.length + 1);
}
}
reorderedColumnValues[index] = columnValues[i];
if (bitMaps != null) {
reorderedBitMaps[index] = bitMaps[i];
}
}
}
return new Pair<>(reorderedColumnValues, reorderedBitMaps);
}

private void filterDeletedTimeStamp(
AlignedTVList alignedTVList,
List<List<TimeRange>> valueColumnsDeletionList,
boolean ignoreAllNullRows,
Map<Long, BitMap> timestampWithBitmap) {
BitMap allValueColDeletedMap = alignedTVList.getAllValueColDeletedMap();

int rowCount = alignedTVList.rowCount();
List<int[]> valueColumnDeleteCursor = new ArrayList<>();
if (valueColumnsDeletionList != null) {
valueColumnsDeletionList.forEach(x -> valueColumnDeleteCursor.add(new int[] {0}));
}

for (int row = 0; row < rowCount; row++) {
// the row is deleted
if (allValueColDeletedMap != null && allValueColDeletedMap.isMarked(row)) {
continue;
}
long timestamp = alignedTVList.getTime(row);

BitMap bitMap = new BitMap(schemaList.size());
for (int column = 0; column < schemaList.size(); column++) {
if (alignedTVList.isNullValue(alignedTVList.getValueIndex(row), column)) {
bitMap.mark(column);
}

// skip deleted row
if (valueColumnsDeletionList != null
&& !valueColumnsDeletionList.isEmpty()
&& isPointDeleted(
timestamp,
valueColumnsDeletionList.get(column),
valueColumnDeleteCursor.get(column))) {
bitMap.mark(column);
}

// skip all-null row
if (ignoreAllNullRows && bitMap.isAllMarked()) {
continue;
}
timestampWithBitmap.put(timestamp, bitMap);
}
}
}

public long[] getFilteredTimestamp(
List<List<TimeRange>> deletionList, List<BitMap> bitMaps, boolean ignoreAllNullRows) {
Map<Long, BitMap> timestampWithBitmap = new TreeMap<>();

filterDeletedTimeStamp(list, deletionList, ignoreAllNullRows, timestampWithBitmap);
for (AlignedTVList alignedTVList : sortedList) {
filterDeletedTimeStamp(alignedTVList, deletionList, ignoreAllNullRows, timestampWithBitmap);
}

List<Long> filteredTimestamps = new ArrayList<>();
for (Map.Entry<Long, BitMap> entry : timestampWithBitmap.entrySet()) {
filteredTimestamps.add(entry.getKey());
bitMaps.add(entry.getValue());
}
return filteredTimestamps.stream().mapToLong(Long::valueOf).toArray();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public TVList getTvListByColumnIndex(
}
}
}
AlignedTVList alignedTvList = AlignedTVList.newAlignedList(dataTypeList);
AlignedTVList alignedTvList = AlignedTVList.newAlignedList(new ArrayList<>(dataTypeList));
alignedTvList.timestamps = this.timestamps;
alignedTvList.indices = this.indices;
alignedTvList.values = values;
Expand All @@ -147,7 +147,7 @@ public TVList getTvListByColumnIndex(

@Override
public synchronized AlignedTVList clone() {
AlignedTVList cloneList = AlignedTVList.newAlignedList(dataTypes);
AlignedTVList cloneList = AlignedTVList.newAlignedList(new ArrayList<>(dataTypes));
cloneAs(cloneList);
cloneList.timeDeletedCnt = this.timeDeletedCnt;
System.arraycopy(
Expand Down Expand Up @@ -346,10 +346,11 @@ private TsPrimitiveType getAlignedValueByValueIndex(

public void extendColumn(TSDataType dataType) {
if (bitMaps == null) {
bitMaps = new ArrayList<>(values.size());
List<List<BitMap>> localBitMaps = new ArrayList<>(values.size());
for (int i = 0; i < values.size(); i++) {
bitMaps.add(null);
localBitMaps.add(null);
}
bitMaps = localBitMaps;
}
List<Object> columnValue = new ArrayList<>();
List<BitMap> columnBitMaps = new ArrayList<>();
Expand Down Expand Up @@ -611,10 +612,11 @@ public Pair<Integer, Boolean> delete(long lowerBound, long upperBound, int colum

public void deleteColumn(int columnIndex) {
if (bitMaps == null) {
bitMaps = new ArrayList<>(dataTypes.size());
List<List<BitMap>> localBitMaps = new ArrayList<>(dataTypes.size());
for (int j = 0; j < dataTypes.size(); j++) {
bitMaps.add(null);
localBitMaps.add(null);
}
bitMaps = localBitMaps;
}
if (bitMaps.get(columnIndex) == null) {
List<BitMap> columnBitMaps = new ArrayList<>();
Expand All @@ -624,6 +626,9 @@ public void deleteColumn(int columnIndex) {
bitMaps.set(columnIndex, columnBitMaps);
}
for (int i = 0; i < bitMaps.get(columnIndex).size(); i++) {
if (bitMaps.get(columnIndex).get(i) == null) {
bitMaps.get(columnIndex).set(i, new BitMap(ARRAY_SIZE));
}
bitMaps.get(columnIndex).get(i).markAll();
}
}
Expand Down Expand Up @@ -867,10 +872,11 @@ private void arrayCopy(Object[] value, int idx, int arrayIndex, int elementIndex
private void markNullValue(int columnIndex, int arrayIndex, int elementIndex) {
// init BitMaps if doesn't have
if (bitMaps == null) {
bitMaps = new ArrayList<>(dataTypes.size());
List<List<BitMap>> localBitMaps = new ArrayList<>(dataTypes.size());
for (int i = 0; i < dataTypes.size(); i++) {
bitMaps.add(null);
localBitMaps.add(null);
}
bitMaps = localBitMaps;
}

// if the bitmap in columnIndex is null, init the bitmap of this column from the beginning
Expand Down Expand Up @@ -1398,7 +1404,7 @@ public static AlignedTVList deserialize(DataInputStream stream) throws IOExcepti
bitMaps[columnIndex] = bitMap;
}

AlignedTVList tvList = AlignedTVList.newAlignedList(dataTypes);
AlignedTVList tvList = AlignedTVList.newAlignedList(new ArrayList<>(dataTypes));
tvList.putAlignedValues(times, values, bitMaps, 0, rowCount, null);

boolean hasTimeColDeletedMap = stream.read() == 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,11 @@ public int getValueIndex(int index) {
protected void markNullValue(int arrayIndex, int elementIndex) {
// init bitMap if doesn't have
if (bitMap == null) {
bitMap = new ArrayList<>();
List<BitMap> localBitMap = new ArrayList<>();
for (int i = 0; i < timestamps.size(); i++) {
bitMap.add(new BitMap(ARRAY_SIZE));
localBitMap.add(new BitMap(ARRAY_SIZE));
}
bitMap = localBitMap;
}
// if the bitmap in arrayIndex is null, init the bitmap
if (bitMap.get(arrayIndex) == null) {
Expand Down
Loading