|
61 | 61 | public class AlignedWritableMemChunk extends AbstractWritableMemChunk { |
62 | 62 |
|
63 | 63 | private final Map<String, Integer> measurementIndexMap; |
64 | | - private final List<TSDataType> dataTypes; |
| 64 | + private List<TSDataType> dataTypes; |
65 | 65 | private final List<IMeasurementSchema> schemaList; |
66 | 66 | private AlignedTVList list; |
67 | 67 | private List<AlignedTVList> sortedList; |
@@ -197,7 +197,8 @@ protected void handoverAlignedTvList() { |
197 | 197 | list.sort(); |
198 | 198 | } |
199 | 199 | sortedList.add(list); |
200 | | - this.list = AlignedTVList.newAlignedList(dataTypes); |
| 200 | + this.list = AlignedTVList.newAlignedList(new ArrayList<>(dataTypes)); |
| 201 | + this.dataTypes = list.getTsDataTypes(); |
201 | 202 | } |
202 | 203 |
|
203 | 204 | @Override |
@@ -230,6 +231,107 @@ public void writeAlignedTablet( |
230 | 231 | } |
231 | 232 | } |
232 | 233 |
|
| 234 | + /** |
| 235 | + * Check metadata of columns and return array that mapping existed metadata to index of data |
| 236 | + * column. |
| 237 | + * |
| 238 | + * @param schemaListInInsertPlan Contains all existed schema in InsertPlan. If some timeseries |
| 239 | + * have been deleted, there will be null in its slot. |
| 240 | + * @return columnIndexArray: schemaList[i] is schema of columns[columnIndexArray[i]] |
| 241 | + */ |
| 242 | + private Pair<Object[], BitMap[]> checkAndReorderColumnValuesInInsertPlan( |
| 243 | + List<IMeasurementSchema> schemaListInInsertPlan, Object[] columnValues, BitMap[] bitMaps) { |
| 244 | + Object[] reorderedColumnValues = new Object[schemaList.size()]; |
| 245 | + BitMap[] reorderedBitMaps = bitMaps == null ? null : new BitMap[schemaList.size()]; |
| 246 | + for (int i = 0; i < schemaListInInsertPlan.size(); i++) { |
| 247 | + IMeasurementSchema measurementSchema = schemaListInInsertPlan.get(i); |
| 248 | + if (measurementSchema != null) { |
| 249 | + Integer index = this.measurementIndexMap.get(measurementSchema.getMeasurementName()); |
| 250 | + // Index is null means this measurement was not in this AlignedTVList before. |
| 251 | + // We need to extend a new column in AlignedMemChunk and AlignedTVList. |
| 252 | + // And the reorderedColumnValues should extend one more column for the new measurement |
| 253 | + if (index == null) { |
| 254 | + index = this.list.getTsDataTypes().size(); |
| 255 | + this.measurementIndexMap.put(schemaListInInsertPlan.get(i).getMeasurementName(), index); |
| 256 | + this.schemaList.add(schemaListInInsertPlan.get(i)); |
| 257 | + this.list.extendColumn(schemaListInInsertPlan.get(i).getType()); |
| 258 | + reorderedColumnValues = |
| 259 | + Arrays.copyOf(reorderedColumnValues, reorderedColumnValues.length + 1); |
| 260 | + if (reorderedBitMaps != null) { |
| 261 | + reorderedBitMaps = Arrays.copyOf(reorderedBitMaps, reorderedBitMaps.length + 1); |
| 262 | + } |
| 263 | + } |
| 264 | + reorderedColumnValues[index] = columnValues[i]; |
| 265 | + if (bitMaps != null) { |
| 266 | + reorderedBitMaps[index] = bitMaps[i]; |
| 267 | + } |
| 268 | + } |
| 269 | + } |
| 270 | + return new Pair<>(reorderedColumnValues, reorderedBitMaps); |
| 271 | + } |
| 272 | + |
| 273 | + private void filterDeletedTimeStamp( |
| 274 | + AlignedTVList alignedTVList, |
| 275 | + List<List<TimeRange>> valueColumnsDeletionList, |
| 276 | + boolean ignoreAllNullRows, |
| 277 | + Map<Long, BitMap> timestampWithBitmap) { |
| 278 | + BitMap allValueColDeletedMap = alignedTVList.getAllValueColDeletedMap(); |
| 279 | + |
| 280 | + int rowCount = alignedTVList.rowCount(); |
| 281 | + List<int[]> valueColumnDeleteCursor = new ArrayList<>(); |
| 282 | + if (valueColumnsDeletionList != null) { |
| 283 | + valueColumnsDeletionList.forEach(x -> valueColumnDeleteCursor.add(new int[] {0})); |
| 284 | + } |
| 285 | + |
| 286 | + for (int row = 0; row < rowCount; row++) { |
| 287 | + // the row is deleted |
| 288 | + if (allValueColDeletedMap != null && allValueColDeletedMap.isMarked(row)) { |
| 289 | + continue; |
| 290 | + } |
| 291 | + long timestamp = alignedTVList.getTime(row); |
| 292 | + |
| 293 | + BitMap bitMap = new BitMap(schemaList.size()); |
| 294 | + for (int column = 0; column < schemaList.size(); column++) { |
| 295 | + if (alignedTVList.isNullValue(alignedTVList.getValueIndex(row), column)) { |
| 296 | + bitMap.mark(column); |
| 297 | + } |
| 298 | + |
| 299 | + // skip deleted row |
| 300 | + if (valueColumnsDeletionList != null |
| 301 | + && !valueColumnsDeletionList.isEmpty() |
| 302 | + && isPointDeleted( |
| 303 | + timestamp, |
| 304 | + valueColumnsDeletionList.get(column), |
| 305 | + valueColumnDeleteCursor.get(column))) { |
| 306 | + bitMap.mark(column); |
| 307 | + } |
| 308 | + |
| 309 | + // skip all-null row |
| 310 | + if (ignoreAllNullRows && bitMap.isAllMarked()) { |
| 311 | + continue; |
| 312 | + } |
| 313 | + timestampWithBitmap.put(timestamp, bitMap); |
| 314 | + } |
| 315 | + } |
| 316 | + } |
| 317 | + |
| 318 | + public long[] getFilteredTimestamp( |
| 319 | + List<List<TimeRange>> deletionList, List<BitMap> bitMaps, boolean ignoreAllNullRows) { |
| 320 | + Map<Long, BitMap> timestampWithBitmap = new TreeMap<>(); |
| 321 | + |
| 322 | + filterDeletedTimeStamp(list, deletionList, ignoreAllNullRows, timestampWithBitmap); |
| 323 | + for (AlignedTVList alignedTVList : sortedList) { |
| 324 | + filterDeletedTimeStamp(alignedTVList, deletionList, ignoreAllNullRows, timestampWithBitmap); |
| 325 | + } |
| 326 | + |
| 327 | + List<Long> filteredTimestamps = new ArrayList<>(); |
| 328 | + for (Map.Entry<Long, BitMap> entry : timestampWithBitmap.entrySet()) { |
| 329 | + filteredTimestamps.add(entry.getKey()); |
| 330 | + bitMaps.add(entry.getValue()); |
| 331 | + } |
| 332 | + return filteredTimestamps.stream().mapToLong(Long::valueOf).toArray(); |
| 333 | + } |
| 334 | + |
233 | 335 | @Override |
234 | 336 | public AlignedTVList getWorkingTVList() { |
235 | 337 | return list; |
@@ -785,112 +887,4 @@ public List<Integer> buildColumnIndexList(List<IMeasurementSchema> schemaList) { |
785 | 887 | } |
786 | 888 | return columnIndexList; |
787 | 889 | } |
788 | | - |
789 | | - /** |
790 | | - * Check metadata of columns and return array that mapping existed metadata to index of data |
791 | | - * column. |
792 | | - * |
793 | | - * @param schemaListInInsertPlan Contains all existed schema in InsertPlan. If some timeseries |
794 | | - * have been deleted, there will be null in its slot. |
795 | | - * @return columnIndexArray: schemaList[i] is schema of columns[columnIndexArray[i]] |
796 | | - */ |
797 | | - private Pair<Object[], BitMap[]> checkAndReorderColumnValuesInInsertPlan( |
798 | | - List<IMeasurementSchema> schemaListInInsertPlan, Object[] columnValues, BitMap[] bitMaps) { |
799 | | - Object[] reorderedColumnValues = new Object[schemaList.size()]; |
800 | | - BitMap[] reorderedBitMaps = bitMaps == null ? null : new BitMap[schemaList.size()]; |
801 | | - for (int i = 0; i < schemaListInInsertPlan.size(); i++) { |
802 | | - IMeasurementSchema measurementSchema = schemaListInInsertPlan.get(i); |
803 | | - if (measurementSchema != null) { |
804 | | - Integer index = this.measurementIndexMap.get(measurementSchema.getMeasurementName()); |
805 | | - // Index is null means this measurement was not in this AlignedTVList before. |
806 | | - // We need to extend a new column in AlignedMemChunk and AlignedTVList. |
807 | | - // And the reorderedColumnValues should extend one more column for the new measurement |
808 | | - if (index == null) { |
809 | | - index = |
810 | | - measurementIndexMap.isEmpty() |
811 | | - ? 0 |
812 | | - : measurementIndexMap.values().stream() |
813 | | - .mapToInt(Integer::intValue) |
814 | | - .max() |
815 | | - .getAsInt() |
816 | | - + 1; |
817 | | - this.measurementIndexMap.put(schemaListInInsertPlan.get(i).getMeasurementName(), index); |
818 | | - this.schemaList.add(schemaListInInsertPlan.get(i)); |
819 | | - this.list.extendColumn(schemaListInInsertPlan.get(i).getType()); |
820 | | - reorderedColumnValues = |
821 | | - Arrays.copyOf(reorderedColumnValues, reorderedColumnValues.length + 1); |
822 | | - if (reorderedBitMaps != null) { |
823 | | - reorderedBitMaps = Arrays.copyOf(reorderedBitMaps, reorderedBitMaps.length + 1); |
824 | | - } |
825 | | - } |
826 | | - reorderedColumnValues[index] = columnValues[i]; |
827 | | - if (bitMaps != null) { |
828 | | - reorderedBitMaps[index] = bitMaps[i]; |
829 | | - } |
830 | | - } |
831 | | - } |
832 | | - return new Pair<>(reorderedColumnValues, reorderedBitMaps); |
833 | | - } |
834 | | - |
835 | | - private void filterDeletedTimeStamp( |
836 | | - AlignedTVList alignedTVList, |
837 | | - List<List<TimeRange>> valueColumnsDeletionList, |
838 | | - boolean ignoreAllNullRows, |
839 | | - Map<Long, BitMap> timestampWithBitmap) { |
840 | | - BitMap allValueColDeletedMap = alignedTVList.getAllValueColDeletedMap(); |
841 | | - |
842 | | - int rowCount = alignedTVList.rowCount(); |
843 | | - List<int[]> valueColumnDeleteCursor = new ArrayList<>(); |
844 | | - if (valueColumnsDeletionList != null) { |
845 | | - valueColumnsDeletionList.forEach(x -> valueColumnDeleteCursor.add(new int[] {0})); |
846 | | - } |
847 | | - |
848 | | - for (int row = 0; row < rowCount; row++) { |
849 | | - // the row is deleted |
850 | | - if (allValueColDeletedMap != null && allValueColDeletedMap.isMarked(row)) { |
851 | | - continue; |
852 | | - } |
853 | | - long timestamp = alignedTVList.getTime(row); |
854 | | - |
855 | | - BitMap bitMap = new BitMap(schemaList.size()); |
856 | | - for (int column = 0; column < schemaList.size(); column++) { |
857 | | - if (alignedTVList.isNullValue(alignedTVList.getValueIndex(row), column)) { |
858 | | - bitMap.mark(column); |
859 | | - } |
860 | | - |
861 | | - // skip deleted row |
862 | | - if (valueColumnsDeletionList != null |
863 | | - && !valueColumnsDeletionList.isEmpty() |
864 | | - && isPointDeleted( |
865 | | - timestamp, |
866 | | - valueColumnsDeletionList.get(column), |
867 | | - valueColumnDeleteCursor.get(column))) { |
868 | | - bitMap.mark(column); |
869 | | - } |
870 | | - |
871 | | - // skip all-null row |
872 | | - if (ignoreAllNullRows && bitMap.isAllMarked()) { |
873 | | - continue; |
874 | | - } |
875 | | - timestampWithBitmap.put(timestamp, bitMap); |
876 | | - } |
877 | | - } |
878 | | - } |
879 | | - |
880 | | - public long[] getFilteredTimestamp( |
881 | | - List<List<TimeRange>> deletionList, List<BitMap> bitMaps, boolean ignoreAllNullRows) { |
882 | | - Map<Long, BitMap> timestampWithBitmap = new TreeMap<>(); |
883 | | - |
884 | | - filterDeletedTimeStamp(list, deletionList, ignoreAllNullRows, timestampWithBitmap); |
885 | | - for (AlignedTVList alignedTVList : sortedList) { |
886 | | - filterDeletedTimeStamp(alignedTVList, deletionList, ignoreAllNullRows, timestampWithBitmap); |
887 | | - } |
888 | | - |
889 | | - List<Long> filteredTimestamps = new ArrayList<>(); |
890 | | - for (Map.Entry<Long, BitMap> entry : timestampWithBitmap.entrySet()) { |
891 | | - filteredTimestamps.add(entry.getKey()); |
892 | | - bitMaps.add(entry.getValue()); |
893 | | - } |
894 | | - return filteredTimestamps.stream().mapToLong(Long::valueOf).toArray(); |
895 | | - } |
896 | 890 | } |
0 commit comments