diff --git a/datafusion/core/src/row/jit/mod.rs b/datafusion/core/src/row/jit/mod.rs new file mode 100644 index 0000000000000..fbb6efe3c0b2d --- /dev/null +++ b/datafusion/core/src/row/jit/mod.rs @@ -0,0 +1,224 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Just-In-Time(JIT) version for row reader and writers + +mod reader; +mod writer; + +#[macro_export] +/// register external functions to the assembler +macro_rules! reg_fn { + ($ASS:ident, $FN: path, $PARAM: expr, $RET: expr) => { + $ASS.register_extern_fn(fn_name($FN), $FN as *const u8, $PARAM, $RET)?; + }; +} + +fn fn_name(f: T) -> &'static str { + fn type_name_of(_: T) -> &'static str { + std::any::type_name::() + } + let name = type_name_of(f); + + // Find and cut the rest of the path + match &name.rfind(':') { + Some(pos) => &name[pos + 1..name.len()], + None => name, + } +} + +#[cfg(test)] +mod tests { + use crate::error::Result; + use crate::row::jit::reader::read_as_batch_jit; + use crate::row::jit::writer::write_batch_unchecked_jit; + use arrow::record_batch::RecordBatch; + use arrow::{array::*, datatypes::*}; + use datafusion_jit::api::Assembler; + use std::sync::Arc; + use DataType::*; + + macro_rules! fn_test_single_type { + ($ARRAY: ident, $TYPE: expr, $VEC: expr) => { + paste::item! { + #[test] + #[allow(non_snake_case)] + fn []() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, true)])); + let a = $ARRAY::from($VEC); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; + let mut vector = vec![0; 1024]; + let assembler = Assembler::default(); + let row_offsets = + { write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler)? }; + let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? }; + assert_eq!(batch, output_batch); + Ok(()) + } + + #[test] + #[allow(non_snake_case)] + fn []() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, false)])); + let v = $VEC.into_iter().filter(|o| o.is_some()).collect::>(); + let a = $ARRAY::from(v); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; + let mut vector = vec![0; 1024]; + let assembler = Assembler::default(); + let row_offsets = + { write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler)? }; + let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? }; + assert_eq!(batch, output_batch); + Ok(()) + } + } + }; + } + + fn_test_single_type!( + BooleanArray, + Boolean, + vec![Some(true), Some(false), None, Some(true), None] + ); + + fn_test_single_type!( + Int8Array, + Int8, + vec![Some(5), Some(7), None, Some(0), Some(111)] + ); + + fn_test_single_type!( + Int16Array, + Int16, + vec![Some(5), Some(7), None, Some(0), Some(111)] + ); + + fn_test_single_type!( + Int32Array, + Int32, + vec![Some(5), Some(7), None, Some(0), Some(111)] + ); + + fn_test_single_type!( + Int64Array, + Int64, + vec![Some(5), Some(7), None, Some(0), Some(111)] + ); + + fn_test_single_type!( + UInt8Array, + UInt8, + vec![Some(5), Some(7), None, Some(0), Some(111)] + ); + + fn_test_single_type!( + UInt16Array, + UInt16, + vec![Some(5), Some(7), None, Some(0), Some(111)] + ); + + fn_test_single_type!( + UInt32Array, + UInt32, + vec![Some(5), Some(7), None, Some(0), Some(111)] + ); + + fn_test_single_type!( + UInt64Array, + UInt64, + vec![Some(5), Some(7), None, Some(0), Some(111)] + ); + + fn_test_single_type!( + Float32Array, + Float32, + vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)] + ); + + fn_test_single_type!( + Float64Array, + Float64, + vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)] + ); + + fn_test_single_type!( + Date32Array, + Date32, + vec![Some(5), Some(7), None, Some(0), Some(111)] + ); + + fn_test_single_type!( + Date64Array, + Date64, + vec![Some(5), Some(7), None, Some(0), Some(111)] + ); + + fn_test_single_type!( + StringArray, + Utf8, + vec![Some("hello"), Some("world"), None, Some(""), Some("")] + ); + + #[test] + fn test_single_binary_jit() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", Binary, true)])); + let values: Vec> = + vec![Some(b"one"), Some(b"two"), None, Some(b""), Some(b"three")]; + let a = BinaryArray::from_opt_vec(values); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; + let mut vector = vec![0; 8192]; + let assembler = Assembler::default(); + let row_offsets = { + write_batch_unchecked_jit( + &mut vector, + 0, + &batch, + 0, + schema.clone(), + &assembler, + )? + }; + let output_batch = + { read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? }; + assert_eq!(batch, output_batch); + Ok(()) + } + + #[test] + fn test_single_binary_jit_null_free() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", Binary, false)])); + let values: Vec<&[u8]> = vec![b"one", b"two", b"", b"three"]; + let a = BinaryArray::from_vec(values); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; + let mut vector = vec![0; 8192]; + let assembler = Assembler::default(); + let row_offsets = { + write_batch_unchecked_jit( + &mut vector, + 0, + &batch, + 0, + schema.clone(), + &assembler, + )? + }; + let output_batch = + { read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? }; + assert_eq!(batch, output_batch); + Ok(()) + } +} diff --git a/datafusion/core/src/row/jit/reader.rs b/datafusion/core/src/row/jit/reader.rs new file mode 100644 index 0000000000000..c1018366720c7 --- /dev/null +++ b/datafusion/core/src/row/jit/reader.rs @@ -0,0 +1,164 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Accessing row from raw bytes with JIT + +use crate::error::{DataFusionError, Result}; +use crate::reg_fn; +use crate::row::jit::fn_name; +use crate::row::reader::RowReader; +use crate::row::reader::*; +use crate::row::MutableRecordBatch; +use arrow::array::ArrayBuilder; +use arrow::datatypes::{DataType, Schema}; +use arrow::record_batch::RecordBatch; +use datafusion_jit::api::Assembler; +use datafusion_jit::api::GeneratedFunction; +use datafusion_jit::ast::{I64, PTR}; +use std::sync::Arc; + +/// Read `data` of raw-bytes rows starting at `offsets` out to a record batch + +pub fn read_as_batch_jit( + data: &[u8], + schema: Arc, + offsets: &[usize], + assembler: &Assembler, +) -> Result { + let row_num = offsets.len(); + let mut output = MutableRecordBatch::new(row_num, schema.clone()); + let mut row = RowReader::new(&schema); + register_read_functions(assembler)?; + let gen_func = gen_read_row(&schema, assembler)?; + let mut jit = assembler.create_jit(); + let code_ptr = jit.compile(gen_func)?; + let code_fn = unsafe { + std::mem::transmute::<_, fn(&RowReader, &mut MutableRecordBatch)>(code_ptr) + }; + + for offset in offsets.iter().take(row_num) { + row.point_to(*offset, data); + code_fn(&row, &mut output); + } + + output.output().map_err(DataFusionError::ArrowError) +} + +fn get_array_mut( + batch: &mut MutableRecordBatch, + col_idx: usize, +) -> &mut Box { + let arrays: &mut [Box] = batch.arrays.as_mut(); + &mut arrays[col_idx] +} + +fn register_read_functions(asm: &Assembler) -> Result<()> { + let reader_param = vec![PTR, I64, PTR]; + reg_fn!(asm, get_array_mut, vec![PTR, I64], Some(PTR)); + reg_fn!(asm, read_field_bool, reader_param.clone(), None); + reg_fn!(asm, read_field_u8, reader_param.clone(), None); + reg_fn!(asm, read_field_u16, reader_param.clone(), None); + reg_fn!(asm, read_field_u32, reader_param.clone(), None); + reg_fn!(asm, read_field_u64, reader_param.clone(), None); + reg_fn!(asm, read_field_i8, reader_param.clone(), None); + reg_fn!(asm, read_field_i16, reader_param.clone(), None); + reg_fn!(asm, read_field_i32, reader_param.clone(), None); + reg_fn!(asm, read_field_i64, reader_param.clone(), None); + reg_fn!(asm, read_field_f32, reader_param.clone(), None); + reg_fn!(asm, read_field_f64, reader_param.clone(), None); + reg_fn!(asm, read_field_date32, reader_param.clone(), None); + reg_fn!(asm, read_field_date64, reader_param.clone(), None); + reg_fn!(asm, read_field_utf8, reader_param.clone(), None); + reg_fn!(asm, read_field_binary, reader_param.clone(), None); + reg_fn!(asm, read_field_bool_null_free, reader_param.clone(), None); + reg_fn!(asm, read_field_u8_null_free, reader_param.clone(), None); + reg_fn!(asm, read_field_u16_null_free, reader_param.clone(), None); + reg_fn!(asm, read_field_u32_null_free, reader_param.clone(), None); + reg_fn!(asm, read_field_u64_null_free, reader_param.clone(), None); + reg_fn!(asm, read_field_i8_null_free, reader_param.clone(), None); + reg_fn!(asm, read_field_i16_null_free, reader_param.clone(), None); + reg_fn!(asm, read_field_i32_null_free, reader_param.clone(), None); + reg_fn!(asm, read_field_i64_null_free, reader_param.clone(), None); + reg_fn!(asm, read_field_f32_null_free, reader_param.clone(), None); + reg_fn!(asm, read_field_f64_null_free, reader_param.clone(), None); + reg_fn!(asm, read_field_date32_null_free, reader_param.clone(), None); + reg_fn!(asm, read_field_date64_null_free, reader_param.clone(), None); + reg_fn!(asm, read_field_utf8_null_free, reader_param.clone(), None); + reg_fn!(asm, read_field_binary_null_free, reader_param, None); + Ok(()) +} + +fn gen_read_row( + schema: &Arc, + assembler: &Assembler, +) -> Result { + use DataType::*; + let mut builder = assembler + .new_func_builder("read_row") + .param("row", PTR) + .param("batch", PTR); + let mut b = builder.enter_block(); + for (i, f) in schema.fields().iter().enumerate() { + let dt = f.data_type(); + let arr = format!("a{}", i); + b.declare_as( + &arr, + b.call("get_array_mut", vec![b.id("batch")?, b.lit_i(i as i64)])?, + )?; + let params = vec![b.id(&arr)?, b.lit_i(i as i64), b.id("row")?]; + if f.is_nullable() { + match dt { + Boolean => b.call_stmt("read_field_bool", params)?, + UInt8 => b.call_stmt("read_field_u8", params)?, + UInt16 => b.call_stmt("read_field_u16", params)?, + UInt32 => b.call_stmt("read_field_u32", params)?, + UInt64 => b.call_stmt("read_field_u64", params)?, + Int8 => b.call_stmt("read_field_i8", params)?, + Int16 => b.call_stmt("read_field_i16", params)?, + Int32 => b.call_stmt("read_field_i32", params)?, + Int64 => b.call_stmt("read_field_i64", params)?, + Float32 => b.call_stmt("read_field_f32", params)?, + Float64 => b.call_stmt("read_field_f64", params)?, + Date32 => b.call_stmt("read_field_date32", params)?, + Date64 => b.call_stmt("read_field_date64", params)?, + Utf8 => b.call_stmt("read_field_utf8", params)?, + Binary => b.call_stmt("read_field_binary", params)?, + _ => unimplemented!(), + } + } else { + match dt { + Boolean => b.call_stmt("read_field_bool_null_free", params)?, + UInt8 => b.call_stmt("read_field_u8_null_free", params)?, + UInt16 => b.call_stmt("read_field_u16_null_free", params)?, + UInt32 => b.call_stmt("read_field_u32_null_free", params)?, + UInt64 => b.call_stmt("read_field_u64_null_free", params)?, + Int8 => b.call_stmt("read_field_i8_null_free", params)?, + Int16 => b.call_stmt("read_field_i16_null_free", params)?, + Int32 => b.call_stmt("read_field_i32_null_free", params)?, + Int64 => b.call_stmt("read_field_i64_null_free", params)?, + Float32 => b.call_stmt("read_field_f32_null_free", params)?, + Float64 => b.call_stmt("read_field_f64_null_free", params)?, + Date32 => b.call_stmt("read_field_date32_null_free", params)?, + Date64 => b.call_stmt("read_field_date64_null_free", params)?, + Utf8 => b.call_stmt("read_field_utf8_null_free", params)?, + Binary => b.call_stmt("read_field_binary_null_free", params)?, + _ => unimplemented!(), + } + } + } + Ok(b.build()) +} diff --git a/datafusion/core/src/row/jit/writer.rs b/datafusion/core/src/row/jit/writer.rs new file mode 100644 index 0000000000000..5b077caf6a471 --- /dev/null +++ b/datafusion/core/src/row/jit/writer.rs @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Reusable JIT version of row writer backed by Vec to stitch attributes together + +use crate::error::Result; +use crate::reg_fn; +use crate::row::jit::fn_name; +use crate::row::schema_null_free; +use crate::row::writer::RowWriter; +use crate::row::writer::*; +use arrow::array::Array; +use arrow::datatypes::{DataType, Schema}; +use arrow::record_batch::RecordBatch; +use datafusion_jit::api::CodeBlock; +use datafusion_jit::api::{Assembler, GeneratedFunction}; +use datafusion_jit::ast::Expr; +use datafusion_jit::ast::{BOOL, I64, PTR}; +use std::sync::Arc; + +/// Append batch from `row_idx` to `output` buffer start from `offset` +/// # Panics +/// +/// This function will panic if the output buffer doesn't have enough space to hold all the rows +pub fn write_batch_unchecked_jit( + output: &mut [u8], + offset: usize, + batch: &RecordBatch, + row_idx: usize, + schema: Arc, + assembler: &Assembler, +) -> Result> { + let mut writer = RowWriter::new(&schema); + let mut current_offset = offset; + let mut offsets = vec![]; + register_write_functions(assembler)?; + let gen_func = gen_write_row(&schema, assembler)?; + let mut jit = assembler.create_jit(); + let code_ptr = jit.compile(gen_func)?; + + let code_fn = unsafe { + std::mem::transmute::<_, fn(&mut RowWriter, usize, &RecordBatch)>(code_ptr) + }; + + for cur_row in row_idx..batch.num_rows() { + offsets.push(current_offset); + code_fn(&mut writer, cur_row, batch); + writer.end_padding(); + let row_width = writer.row_width; + output[current_offset..current_offset + row_width] + .copy_from_slice(writer.get_row()); + current_offset += row_width; + writer.reset() + } + Ok(offsets) +} + +/// bench jit version write +#[inline(never)] +pub fn bench_write_batch_jit( + batches: &[Vec], + schema: Arc, +) -> Result> { + let assembler = Assembler::default(); + let mut writer = RowWriter::new(&schema); + let mut lengths = vec![]; + register_write_functions(&assembler)?; + let gen_func = gen_write_row(&schema, &assembler)?; + let mut jit = assembler.create_jit(); + let code_ptr = jit.compile(gen_func)?; + let code_fn = unsafe { + std::mem::transmute::<_, fn(&mut RowWriter, usize, &RecordBatch)>(code_ptr) + }; + + for batch in batches.iter().flatten() { + for cur_row in 0..batch.num_rows() { + code_fn(&mut writer, cur_row, batch); + writer.end_padding(); + lengths.push(writer.row_width); + writer.reset() + } + } + Ok(lengths) +} + +// we could remove this function wrapper once we find a way to call the trait method directly. +fn is_null(col: &Arc, row_idx: usize) -> bool { + col.is_null(row_idx) +} + +fn register_write_functions(asm: &Assembler) -> Result<()> { + let reader_param = vec![PTR, I64, PTR]; + reg_fn!(asm, RecordBatch::column, vec![PTR, I64], Some(PTR)); + reg_fn!(asm, RowWriter::set_null_at, vec![PTR, I64], None); + reg_fn!(asm, RowWriter::set_non_null_at, vec![PTR, I64], None); + reg_fn!(asm, is_null, vec![PTR, I64], Some(BOOL)); + reg_fn!(asm, write_field_bool, reader_param.clone(), None); + reg_fn!(asm, write_field_u8, reader_param.clone(), None); + reg_fn!(asm, write_field_u16, reader_param.clone(), None); + reg_fn!(asm, write_field_u32, reader_param.clone(), None); + reg_fn!(asm, write_field_u64, reader_param.clone(), None); + reg_fn!(asm, write_field_i8, reader_param.clone(), None); + reg_fn!(asm, write_field_i16, reader_param.clone(), None); + reg_fn!(asm, write_field_i32, reader_param.clone(), None); + reg_fn!(asm, write_field_i64, reader_param.clone(), None); + reg_fn!(asm, write_field_f32, reader_param.clone(), None); + reg_fn!(asm, write_field_f64, reader_param.clone(), None); + reg_fn!(asm, write_field_date32, reader_param.clone(), None); + reg_fn!(asm, write_field_date64, reader_param.clone(), None); + reg_fn!(asm, write_field_utf8, reader_param.clone(), None); + reg_fn!(asm, write_field_binary, reader_param, None); + Ok(()) +} + +fn gen_write_row( + schema: &Arc, + assembler: &Assembler, +) -> Result { + let mut builder = assembler + .new_func_builder("write_row") + .param("row", PTR) + .param("row_idx", I64) + .param("batch", PTR); + let null_free = schema_null_free(schema); + let mut b = builder.enter_block(); + for (i, f) in schema.fields().iter().enumerate() { + let dt = f.data_type(); + let arr = format!("a{}", i); + b.declare_as( + &arr, + b.call("column", vec![b.id("batch")?, b.lit_i(i as i64)])?, + )?; + if f.is_nullable() { + b.if_block( + |c| c.call("is_null", vec![c.id(&arr)?, c.id("row_idx")?]), + |t| { + t.call_stmt("set_null_at", vec![t.id("row")?, t.lit_i(i as i64)])?; + Ok(()) + }, + |e| { + e.call_stmt( + "set_non_null_at", + vec![e.id("row")?, e.lit_i(i as i64)], + )?; + let params = vec![ + e.id("row")?, + e.id(&arr)?, + e.lit_i(i as i64), + e.id("row_idx")?, + ]; + write_typed_field_stmt(dt, e, params)?; + Ok(()) + }, + )?; + } else { + if !null_free { + b.call_stmt("set_non_null_at", vec![b.id("row")?, b.lit_i(i as i64)])?; + } + let params = vec![ + b.id("row")?, + b.id(&arr)?, + b.lit_i(i as i64), + b.id("row_idx")?, + ]; + write_typed_field_stmt(dt, &mut b, params)?; + } + } + Ok(b.build()) +} + +fn write_typed_field_stmt<'a>( + dt: &DataType, + b: &mut CodeBlock<'a>, + params: Vec, +) -> Result<()> { + use DataType::*; + match dt { + Boolean => b.call_stmt("write_field_bool", params)?, + UInt8 => b.call_stmt("write_field_u8", params)?, + UInt16 => b.call_stmt("write_field_u16", params)?, + UInt32 => b.call_stmt("write_field_u32", params)?, + UInt64 => b.call_stmt("write_field_u64", params)?, + Int8 => b.call_stmt("write_field_i8", params)?, + Int16 => b.call_stmt("write_field_i16", params)?, + Int32 => b.call_stmt("write_field_i32", params)?, + Int64 => b.call_stmt("write_field_i64", params)?, + Float32 => b.call_stmt("write_field_f32", params)?, + Float64 => b.call_stmt("write_field_f64", params)?, + Date32 => b.call_stmt("write_field_date32", params)?, + Date64 => b.call_stmt("write_field_date64", params)?, + Utf8 => b.call_stmt("write_field_utf8", params)?, + Binary => b.call_stmt("write_field_binary", params)?, + _ => unimplemented!(), + } + Ok(()) +} diff --git a/datafusion/core/src/row/layout.rs b/datafusion/core/src/row/layout.rs new file mode 100644 index 0000000000000..71699d6eb3b16 --- /dev/null +++ b/datafusion/core/src/row/layout.rs @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Various row layout for different use case + +use crate::row::{schema_null_free, var_length}; +use arrow::datatypes::{DataType, Schema}; +use arrow::util::bit_util::{ceil, round_upto_power_of_2}; +use std::sync::Arc; + +const UTF8_DEFAULT_SIZE: usize = 20; +const BINARY_DEFAULT_SIZE: usize = 100; + +/// Get relative offsets for each field and total width for values +pub fn get_offsets(null_width: usize, schema: &Arc) -> (Vec, usize) { + let mut offsets = vec![]; + let mut offset = null_width; + for f in schema.fields() { + offsets.push(offset); + offset += type_width(f.data_type()); + } + (offsets, offset - null_width) +} + +fn type_width(dt: &DataType) -> usize { + use DataType::*; + if var_length(dt) { + return std::mem::size_of::(); + } + match dt { + Boolean | UInt8 | Int8 => 1, + UInt16 | Int16 => 2, + UInt32 | Int32 | Float32 | Date32 => 4, + UInt64 | Int64 | Float64 | Date64 => 8, + _ => unreachable!(), + } +} + +/// Estimate row width based on schema +pub fn estimate_row_width(schema: &Arc) -> usize { + let null_free = schema_null_free(schema); + let field_count = schema.fields().len(); + let mut width = if null_free { 0 } else { ceil(field_count, 8) }; + for f in schema.fields() { + width += type_width(f.data_type()); + match f.data_type() { + DataType::Utf8 => width += UTF8_DEFAULT_SIZE, + DataType::Binary => width += BINARY_DEFAULT_SIZE, + _ => {} + } + } + round_upto_power_of_2(width, 8) +} diff --git a/datafusion/core/src/row/mod.rs b/datafusion/core/src/row/mod.rs index d92da7ae5620c..1fbf012758424 100644 --- a/datafusion/core/src/row/mod.rs +++ b/datafusion/core/src/row/mod.rs @@ -47,83 +47,19 @@ //! 0 1 2 10 14 22 31 32 //! +use arrow::array::{make_builder, ArrayBuilder}; use arrow::datatypes::{DataType, Schema}; -use arrow::util::bit_util::{get_bit_raw, round_upto_power_of_2}; -use std::fmt::Write; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; use std::sync::Arc; +#[cfg(feature = "jit")] +mod jit; +mod layout; pub mod reader; +mod validity; pub mod writer; -const ALL_VALID_MASK: [u8; 8] = [1, 3, 7, 15, 31, 63, 127, 255]; - -const UTF8_DEFAULT_SIZE: usize = 20; -const BINARY_DEFAULT_SIZE: usize = 100; - -/// Returns if all fields are valid -pub fn all_valid(data: &[u8], n: usize) -> bool { - for item in data.iter().take(n / 8) { - if *item != ALL_VALID_MASK[7] { - return false; - } - } - if n % 8 == 0 { - true - } else { - data[n / 8] == ALL_VALID_MASK[n % 8 - 1] - } -} - -/// Show null bit for each field in a tuple, 1 for valid and 0 for null. -/// For a tuple with nine total fields, valid at field 0, 6, 7, 8 shows as `[10000011, 1]`. -pub struct NullBitsFormatter<'a> { - null_bits: &'a [u8], - field_count: usize, -} - -impl<'a> NullBitsFormatter<'a> { - /// new - pub fn new(null_bits: &'a [u8], field_count: usize) -> Self { - Self { - null_bits, - field_count, - } - } -} - -impl<'a> std::fmt::Debug for NullBitsFormatter<'a> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let mut is_first = true; - let data = self.null_bits; - for i in 0..self.field_count { - if is_first { - f.write_char('[')?; - is_first = false; - } else if i % 8 == 0 { - f.write_str(", ")?; - } - if unsafe { get_bit_raw(data.as_ptr(), i) } { - f.write_char('1')?; - } else { - f.write_char('0')?; - } - } - f.write_char(']')?; - Ok(()) - } -} - -/// Get relative offsets for each field and total width for values -fn get_offsets(null_width: usize, schema: &Arc) -> (Vec, usize) { - let mut offsets = vec![]; - let mut offset = null_width; - for f in schema.fields() { - offsets.push(offset); - offset += type_width(f.data_type()); - } - (offsets, offset - null_width) -} - fn supported_type(dt: &DataType) -> bool { use DataType::*; matches!( @@ -146,75 +82,67 @@ fn supported_type(dt: &DataType) -> bool { ) } -fn var_length(dt: &DataType) -> bool { - use DataType::*; - matches!(dt, Utf8 | Binary) +/// Tell if we can create raw-bytes based rows since we currently +/// has limited data type supports in the row format +pub fn row_supported(schema: &Arc) -> bool { + schema + .fields() + .iter() + .all(|f| supported_type(f.data_type())) } -fn type_width(dt: &DataType) -> usize { +fn var_length(dt: &DataType) -> bool { use DataType::*; - if var_length(dt) { - return std::mem::size_of::(); - } - match dt { - Boolean | UInt8 | Int8 => 1, - UInt16 | Int16 => 2, - UInt32 | Int32 | Float32 | Date32 => 4, - UInt64 | Int64 | Float64 | Date64 => 8, - _ => unreachable!(), - } -} - -fn estimate_row_width(null_width: usize, schema: &Arc) -> usize { - let mut width = null_width; - for f in schema.fields() { - width += type_width(f.data_type()); - match f.data_type() { - DataType::Utf8 => width += UTF8_DEFAULT_SIZE, - DataType::Binary => width += BINARY_DEFAULT_SIZE, - _ => {} - } - } - round_upto_power_of_2(width, 8) + matches!(dt, Utf8 | Binary) } -fn fixed_size(schema: &Arc) -> bool { +/// Tell if the row is of fixed size +pub fn fixed_size(schema: &Arc) -> bool { schema.fields().iter().all(|f| !var_length(f.data_type())) } -fn supported(schema: &Arc) -> bool { - schema - .fields() - .iter() - .all(|f| supported_type(f.data_type())) +/// Tell if schema contains no nullable field +pub fn schema_null_free(schema: &Arc) -> bool { + schema.fields().iter().all(|f| !f.is_nullable()) } -#[cfg(feature = "jit")] -#[macro_export] -/// register external functions to the assembler -macro_rules! reg_fn { - ($ASS:ident, $FN: path, $PARAM: expr, $RET: expr) => { - $ASS.register_extern_fn(fn_name($FN), $FN as *const u8, $PARAM, $RET)?; - }; +/// Columnar Batch buffer +pub struct MutableRecordBatch { + arrays: Vec>, + schema: Arc, } -#[cfg(feature = "jit")] -fn fn_name(f: T) -> &'static str { - fn type_name_of(_: T) -> &'static str { - std::any::type_name::() +impl MutableRecordBatch { + /// new + pub fn new(target_batch_size: usize, schema: Arc) -> Self { + let arrays = new_arrays(&schema, target_batch_size); + Self { arrays, schema } } - let name = type_name_of(f); - // Find and cut the rest of the path - match &name.rfind(':') { - Some(pos) => &name[pos + 1..name.len()], - None => name, + /// Finalize the batch, output and reset this buffer + pub fn output(&mut self) -> ArrowResult { + let result = make_batch(self.schema.clone(), self.arrays.drain(..).collect()); + result } } -/// Tell if schema contains no nullable field -pub fn schema_null_free(schema: &Arc) -> bool { - schema.fields().iter().all(|f| !f.is_nullable()) +fn new_arrays(schema: &Arc, batch_size: usize) -> Vec> { + schema + .fields() + .iter() + .map(|field| { + let dt = field.data_type(); + make_builder(dt, batch_size) + }) + .collect::>() +} + +fn make_batch( + schema: Arc, + mut arrays: Vec>, +) -> ArrowResult { + let columns = arrays.iter_mut().map(|array| array.finish()).collect(); + RecordBatch::try_new(schema, columns) } #[cfg(test)] @@ -222,106 +150,21 @@ mod tests { use super::*; use crate::datasource::file_format::parquet::ParquetFormat; use crate::datasource::file_format::FileFormat; - use crate::datasource::object_store::local::{ - local_object_reader, local_object_reader_stream, local_unpartitioned_file, - LocalFileSystem, - }; + use crate::datasource::listing::local_unpartitioned_file; use crate::error::Result; - use crate::execution::runtime_env::RuntimeEnv; use crate::physical_plan::file_format::FileScanConfig; use crate::physical_plan::{collect, ExecutionPlan}; + use crate::prelude::SessionContext; use crate::row::reader::read_as_batch; - #[cfg(feature = "jit")] - use crate::row::reader::read_as_batch_jit; use crate::row::writer::write_batch_unchecked; - #[cfg(feature = "jit")] - use crate::row::writer::write_batch_unchecked_jit; use arrow::record_batch::RecordBatch; - use arrow::util::bit_util::{ceil, set_bit_raw, unset_bit_raw}; use arrow::{array::*, datatypes::*}; - #[cfg(feature = "jit")] - use datafusion_jit::api::Assembler; - use rand::Rng; + use datafusion_data_access::object_store::local::LocalFileSystem; + use datafusion_data_access::object_store::local::{ + local_object_reader, local_object_reader_stream, + }; use DataType::*; - fn test_validity(bs: &[bool]) { - let n = bs.len(); - let mut data = vec![0; ceil(n, 8)]; - for (i, b) in bs.iter().enumerate() { - if *b { - let data_argument = &mut data; - unsafe { - set_bit_raw(data_argument.as_mut_ptr(), i); - }; - } else { - let data_argument = &mut data; - unsafe { - unset_bit_raw(data_argument.as_mut_ptr(), i); - }; - } - } - let expected = bs.iter().all(|f| *f); - assert_eq!(all_valid(&data, bs.len()), expected); - } - - #[test] - fn test_all_valid() { - let sizes = [4, 8, 12, 16, 19, 23, 32, 44]; - for i in sizes { - { - // contains false - let input = { - let mut rng = rand::thread_rng(); - let mut input: Vec = vec![false; i]; - rng.fill(&mut input[..]); - input - }; - test_validity(&input); - } - - { - // all true - let input = vec![true; i]; - test_validity(&input); - } - } - } - - #[test] - fn test_formatter() -> std::fmt::Result { - assert_eq!( - format!("{:?}", NullBitsFormatter::new(&[0b11000001], 8)), - "[10000011]" - ); - assert_eq!( - format!("{:?}", NullBitsFormatter::new(&[0b11000001, 1], 9)), - "[10000011, 1]" - ); - assert_eq!(format!("{:?}", NullBitsFormatter::new(&[1], 2)), "[10]"); - assert_eq!(format!("{:?}", NullBitsFormatter::new(&[1], 3)), "[100]"); - assert_eq!(format!("{:?}", NullBitsFormatter::new(&[1], 4)), "[1000]"); - assert_eq!(format!("{:?}", NullBitsFormatter::new(&[1], 5)), "[10000]"); - assert_eq!(format!("{:?}", NullBitsFormatter::new(&[1], 6)), "[100000]"); - assert_eq!( - format!("{:?}", NullBitsFormatter::new(&[1], 7)), - "[1000000]" - ); - assert_eq!( - format!("{:?}", NullBitsFormatter::new(&[1], 8)), - "[10000000]" - ); - // extra bytes are ignored - assert_eq!( - format!("{:?}", NullBitsFormatter::new(&[0b11000001, 1, 1, 1], 9)), - "[10000011, 1]" - ); - assert_eq!( - format!("{:?}", NullBitsFormatter::new(&[0b11000001, 1, 1], 16)), - "[10000011, 10000000]" - ); - Ok(()) - } - macro_rules! fn_test_single_type { ($ARRAY: ident, $TYPE: expr, $VEC: expr) => { paste::item! { @@ -334,23 +177,7 @@ mod tests { let mut vector = vec![0; 1024]; let row_offsets = { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) }; - let output_batch = { read_as_batch(&vector, schema, row_offsets)? }; - assert_eq!(batch, output_batch); - Ok(()) - } - - #[test] - #[allow(non_snake_case)] - #[cfg(feature = "jit")] - fn []() -> Result<()> { - let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, true)])); - let a = $ARRAY::from($VEC); - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; - let mut vector = vec![0; 1024]; - let assembler = Assembler::default(); - let row_offsets = - { write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler)? }; - let output_batch = { read_as_batch_jit(&vector, schema, row_offsets, &assembler)? }; + let output_batch = { read_as_batch(&vector, schema, &row_offsets)? }; assert_eq!(batch, output_batch); Ok(()) } @@ -365,24 +192,7 @@ mod tests { let mut vector = vec![0; 1024]; let row_offsets = { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) }; - let output_batch = { read_as_batch(&vector, schema, row_offsets)? }; - assert_eq!(batch, output_batch); - Ok(()) - } - - #[test] - #[allow(non_snake_case)] - #[cfg(feature = "jit")] - fn []() -> Result<()> { - let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, false)])); - let v = $VEC.into_iter().filter(|o| o.is_some()).collect::>(); - let a = $ARRAY::from(v); - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; - let mut vector = vec![0; 1024]; - let assembler = Assembler::default(); - let row_offsets = - { write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler)? }; - let output_batch = { read_as_batch_jit(&vector, schema, row_offsets, &assembler)? }; + let output_batch = { read_as_batch(&vector, schema, &row_offsets)? }; assert_eq!(batch, output_batch); Ok(()) } @@ -484,33 +294,7 @@ mod tests { let mut vector = vec![0; 8192]; let row_offsets = { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) }; - let output_batch = { read_as_batch(&vector, schema, row_offsets)? }; - assert_eq!(batch, output_batch); - Ok(()) - } - - #[test] - #[cfg(feature = "jit")] - fn test_single_binary_jit() -> Result<()> { - let schema = Arc::new(Schema::new(vec![Field::new("a", Binary, true)])); - let values: Vec> = - vec![Some(b"one"), Some(b"two"), None, Some(b""), Some(b"three")]; - let a = BinaryArray::from_opt_vec(values); - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; - let mut vector = vec![0; 8192]; - let assembler = Assembler::default(); - let row_offsets = { - write_batch_unchecked_jit( - &mut vector, - 0, - &batch, - 0, - schema.clone(), - &assembler, - )? - }; - let output_batch = - { read_as_batch_jit(&vector, schema, row_offsets, &assembler)? }; + let output_batch = { read_as_batch(&vector, schema, &row_offsets)? }; assert_eq!(batch, output_batch); Ok(()) } @@ -524,51 +308,27 @@ mod tests { let mut vector = vec![0; 8192]; let row_offsets = { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) }; - let output_batch = { read_as_batch(&vector, schema, row_offsets)? }; - assert_eq!(batch, output_batch); - Ok(()) - } - - #[test] - #[cfg(feature = "jit")] - fn test_single_binary_jit_null_free() -> Result<()> { - let schema = Arc::new(Schema::new(vec![Field::new("a", Binary, false)])); - let values: Vec<&[u8]> = vec![b"one", b"two", b"", b"three"]; - let a = BinaryArray::from_vec(values); - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; - let mut vector = vec![0; 8192]; - let assembler = Assembler::default(); - let row_offsets = { - write_batch_unchecked_jit( - &mut vector, - 0, - &batch, - 0, - schema.clone(), - &assembler, - )? - }; - let output_batch = - { read_as_batch_jit(&vector, schema, row_offsets, &assembler)? }; + let output_batch = { read_as_batch(&vector, schema, &row_offsets)? }; assert_eq!(batch, output_batch); Ok(()) } #[tokio::test] async fn test_with_parquet() -> Result<()> { - let runtime = Arc::new(RuntimeEnv::default()); + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); let exec = get_exec("alltypes_plain.parquet", &projection, None).await?; let schema = exec.schema().clone(); - let batches = collect(exec, runtime).await?; + let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); let batch = &batches[0]; let mut vector = vec![0; 20480]; let row_offsets = { write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone()) }; - let output_batch = { read_as_batch(&vector, schema, row_offsets)? }; + let output_batch = { read_as_batch(&vector, schema, &row_offsets)? }; assert_eq!(*batch, output_batch); Ok(()) @@ -594,7 +354,7 @@ mod tests { )])); let vector = vec![0; 1024]; let row_offsets = vec![0]; - read_as_batch(&vector, schema, row_offsets).unwrap(); + read_as_batch(&vector, schema, &row_offsets).unwrap(); } async fn get_exec( diff --git a/datafusion/core/src/row/reader.rs b/datafusion/core/src/row/reader.rs index 3e2c453639876..4d9fb31368078 100644 --- a/datafusion/core/src/row/reader.rs +++ b/datafusion/core/src/row/reader.rs @@ -18,71 +18,33 @@ //! Accessing row from raw bytes use crate::error::{DataFusionError, Result}; -#[cfg(feature = "jit")] -use crate::reg_fn; -#[cfg(feature = "jit")] -use crate::row::fn_name; -use crate::row::{ - all_valid, get_offsets, schema_null_free, supported, NullBitsFormatter, -}; +use crate::row::layout::get_offsets; +use crate::row::validity::{all_valid, NullBitsFormatter}; +use crate::row::{row_supported, schema_null_free, MutableRecordBatch}; use arrow::array::*; use arrow::datatypes::{DataType, Schema}; -use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; use arrow::util::bit_util::{ceil, get_bit_raw}; -#[cfg(feature = "jit")] -use datafusion_jit::api::Assembler; -#[cfg(feature = "jit")] -use datafusion_jit::api::GeneratedFunction; -#[cfg(feature = "jit")] -use datafusion_jit::ast::{I64, PTR}; use std::sync::Arc; /// Read `data` of raw-bytes rows starting at `offsets` out to a record batch pub fn read_as_batch( data: &[u8], schema: Arc, - offsets: Vec, + offsets: &[usize], ) -> Result { let row_num = offsets.len(); let mut output = MutableRecordBatch::new(row_num, schema.clone()); - let mut row = RowReader::new(&schema, data); + let mut row = RowReader::new(&schema); for offset in offsets.iter().take(row_num) { - row.point_to(*offset); + row.point_to(*offset, data); read_row(&row, &mut output, &schema); } output.output().map_err(DataFusionError::ArrowError) } -/// Read `data` of raw-bytes rows starting at `offsets` out to a record batch -#[cfg(feature = "jit")] -pub fn read_as_batch_jit( - data: &[u8], - schema: Arc, - offsets: Vec, - assembler: &Assembler, -) -> Result { - let row_num = offsets.len(); - let mut output = MutableRecordBatch::new(row_num, schema.clone()); - let mut row = RowReader::new(&schema, data); - register_read_functions(assembler)?; - let gen_func = gen_read_row(&schema, assembler)?; - let mut jit = assembler.create_jit(); - let code_ptr = jit.compile(gen_func)?; - let code_fn = unsafe { - std::mem::transmute::<_, fn(&RowReader, &mut MutableRecordBatch)>(code_ptr) - }; - - for offset in offsets.iter().take(row_num) { - row.point_to(*offset); - code_fn(&row, &mut output); - } - - output.output().map_err(DataFusionError::ArrowError) -} - macro_rules! get_idx { ($NATIVE: ident, $SELF: ident, $IDX: ident, $WIDTH: literal) => {{ $SELF.assert_index_valid($IDX); @@ -156,14 +118,14 @@ impl<'a> std::fmt::Debug for RowReader<'a> { impl<'a> RowReader<'a> { /// new - pub fn new(schema: &Arc, data: &'a [u8]) -> Self { - assert!(supported(schema)); + pub fn new(schema: &Arc) -> Self { + assert!(row_supported(schema)); let null_free = schema_null_free(schema); let field_count = schema.fields().len(); let null_width = if null_free { 0 } else { ceil(field_count, 8) }; let (field_offsets, _) = get_offsets(null_width, schema); Self { - data, + data: &[], base_offset: 0, field_count, null_width, @@ -173,8 +135,9 @@ impl<'a> RowReader<'a> { } /// Update this row to point to position `offset` in `base` - pub fn point_to(&mut self, offset: usize) { + pub fn point_to(&mut self, offset: usize, data: &'a [u8]) { self.base_offset = offset; + self.data = data; } #[inline] @@ -244,7 +207,7 @@ impl<'a> RowReader<'a> { let len = (offset_size & 0xffff_ffff) as usize; let varlena_offset = self.base_offset + offset; let bytes = &self.data[varlena_offset..varlena_offset + len]; - std::str::from_utf8(bytes).unwrap() + unsafe { std::str::from_utf8_unchecked(bytes) } } fn get_binary(&self, idx: usize) -> &[u8] { @@ -293,7 +256,8 @@ impl<'a> RowReader<'a> { } } -fn read_row(row: &RowReader, batch: &mut MutableRecordBatch, schema: &Arc) { +/// Read the row currently pointed by RowWriter to the output columnar batch buffer +pub fn read_row(row: &RowReader, batch: &mut MutableRecordBatch, schema: &Arc) { if row.null_free || row.all_valid() { for ((col_idx, to), field) in batch .arrays @@ -315,118 +279,10 @@ fn read_row(row: &RowReader, batch: &mut MutableRecordBatch, schema: &Arc &mut Box { - let arrays: &mut [Box] = batch.arrays.as_mut(); - &mut arrays[col_idx] -} - -#[cfg(feature = "jit")] -fn register_read_functions(asm: &Assembler) -> Result<()> { - let reader_param = vec![PTR, I64, PTR]; - reg_fn!(asm, get_array_mut, vec![PTR, I64], Some(PTR)); - reg_fn!(asm, read_field_bool, reader_param.clone(), None); - reg_fn!(asm, read_field_u8, reader_param.clone(), None); - reg_fn!(asm, read_field_u16, reader_param.clone(), None); - reg_fn!(asm, read_field_u32, reader_param.clone(), None); - reg_fn!(asm, read_field_u64, reader_param.clone(), None); - reg_fn!(asm, read_field_i8, reader_param.clone(), None); - reg_fn!(asm, read_field_i16, reader_param.clone(), None); - reg_fn!(asm, read_field_i32, reader_param.clone(), None); - reg_fn!(asm, read_field_i64, reader_param.clone(), None); - reg_fn!(asm, read_field_f32, reader_param.clone(), None); - reg_fn!(asm, read_field_f64, reader_param.clone(), None); - reg_fn!(asm, read_field_date32, reader_param.clone(), None); - reg_fn!(asm, read_field_date64, reader_param.clone(), None); - reg_fn!(asm, read_field_utf8, reader_param.clone(), None); - reg_fn!(asm, read_field_binary, reader_param.clone(), None); - reg_fn!(asm, read_field_bool_null_free, reader_param.clone(), None); - reg_fn!(asm, read_field_u8_null_free, reader_param.clone(), None); - reg_fn!(asm, read_field_u16_null_free, reader_param.clone(), None); - reg_fn!(asm, read_field_u32_null_free, reader_param.clone(), None); - reg_fn!(asm, read_field_u64_null_free, reader_param.clone(), None); - reg_fn!(asm, read_field_i8_null_free, reader_param.clone(), None); - reg_fn!(asm, read_field_i16_null_free, reader_param.clone(), None); - reg_fn!(asm, read_field_i32_null_free, reader_param.clone(), None); - reg_fn!(asm, read_field_i64_null_free, reader_param.clone(), None); - reg_fn!(asm, read_field_f32_null_free, reader_param.clone(), None); - reg_fn!(asm, read_field_f64_null_free, reader_param.clone(), None); - reg_fn!(asm, read_field_date32_null_free, reader_param.clone(), None); - reg_fn!(asm, read_field_date64_null_free, reader_param.clone(), None); - reg_fn!(asm, read_field_utf8_null_free, reader_param.clone(), None); - reg_fn!(asm, read_field_binary_null_free, reader_param, None); - Ok(()) -} - -#[cfg(feature = "jit")] -fn gen_read_row( - schema: &Arc, - assembler: &Assembler, -) -> Result { - use DataType::*; - let mut builder = assembler - .new_func_builder("read_row") - .param("row", PTR) - .param("batch", PTR); - let mut b = builder.enter_block(); - for (i, f) in schema.fields().iter().enumerate() { - let dt = f.data_type(); - let arr = format!("a{}", i); - b.declare_as( - &arr, - b.call("get_array_mut", vec![b.id("batch")?, b.lit_i(i as i64)])?, - )?; - let params = vec![b.id(&arr)?, b.lit_i(i as i64), b.id("row")?]; - if f.is_nullable() { - match dt { - Boolean => b.call_stmt("read_field_bool", params)?, - UInt8 => b.call_stmt("read_field_u8", params)?, - UInt16 => b.call_stmt("read_field_u16", params)?, - UInt32 => b.call_stmt("read_field_u32", params)?, - UInt64 => b.call_stmt("read_field_u64", params)?, - Int8 => b.call_stmt("read_field_i8", params)?, - Int16 => b.call_stmt("read_field_i16", params)?, - Int32 => b.call_stmt("read_field_i32", params)?, - Int64 => b.call_stmt("read_field_i64", params)?, - Float32 => b.call_stmt("read_field_f32", params)?, - Float64 => b.call_stmt("read_field_f64", params)?, - Date32 => b.call_stmt("read_field_date32", params)?, - Date64 => b.call_stmt("read_field_date64", params)?, - Utf8 => b.call_stmt("read_field_utf8", params)?, - Binary => b.call_stmt("read_field_binary", params)?, - _ => unimplemented!(), - } - } else { - match dt { - Boolean => b.call_stmt("read_field_bool_null_free", params)?, - UInt8 => b.call_stmt("read_field_u8_null_free", params)?, - UInt16 => b.call_stmt("read_field_u16_null_free", params)?, - UInt32 => b.call_stmt("read_field_u32_null_free", params)?, - UInt64 => b.call_stmt("read_field_u64_null_free", params)?, - Int8 => b.call_stmt("read_field_i8_null_free", params)?, - Int16 => b.call_stmt("read_field_i16_null_free", params)?, - Int32 => b.call_stmt("read_field_i32_null_free", params)?, - Int64 => b.call_stmt("read_field_i64_null_free", params)?, - Float32 => b.call_stmt("read_field_f32_null_free", params)?, - Float64 => b.call_stmt("read_field_f64_null_free", params)?, - Date32 => b.call_stmt("read_field_date32_null_free", params)?, - Date64 => b.call_stmt("read_field_date64_null_free", params)?, - Utf8 => b.call_stmt("read_field_utf8_null_free", params)?, - Binary => b.call_stmt("read_field_binary_null_free", params)?, - _ => unimplemented!(), - } - } - } - Ok(b.build()) -} - macro_rules! fn_read_field { ($NATIVE: ident, $ARRAY: ident) => { paste::item! { - fn [](to: &mut Box, col_idx: usize, row: &RowReader) { + pub(crate) fn [](to: &mut Box, col_idx: usize, row: &RowReader) { let to = to .as_any_mut() .downcast_mut::<$ARRAY>() @@ -436,7 +292,7 @@ macro_rules! fn_read_field { .unwrap(); } - fn [](to: &mut Box, col_idx: usize, row: &RowReader) { + pub(crate) fn [](to: &mut Box, col_idx: usize, row: &RowReader) { let to = to .as_any_mut() .downcast_mut::<$ARRAY>() @@ -464,7 +320,11 @@ fn_read_field!(date32, Date32Builder); fn_read_field!(date64, Date64Builder); fn_read_field!(utf8, StringBuilder); -fn read_field_binary(to: &mut Box, col_idx: usize, row: &RowReader) { +pub(crate) fn read_field_binary( + to: &mut Box, + col_idx: usize, + row: &RowReader, +) { let to = to.as_any_mut().downcast_mut::().unwrap(); if row.is_valid_at(col_idx) { to.append_value(row.get_binary(col_idx)).unwrap(); @@ -473,7 +333,7 @@ fn read_field_binary(to: &mut Box, col_idx: usize, row: &RowRe } } -fn read_field_binary_null_free( +pub(crate) fn read_field_binary_null_free( to: &mut Box, col_idx: usize, row: &RowReader, @@ -537,39 +397,3 @@ fn read_field_null_free( _ => unimplemented!(), } } - -struct MutableRecordBatch { - arrays: Vec>, - schema: Arc, -} - -impl MutableRecordBatch { - fn new(target_batch_size: usize, schema: Arc) -> Self { - let arrays = new_arrays(&schema, target_batch_size); - Self { arrays, schema } - } - - fn output(&mut self) -> ArrowResult { - let result = make_batch(self.schema.clone(), self.arrays.drain(..).collect()); - result - } -} - -fn new_arrays(schema: &Arc, batch_size: usize) -> Vec> { - schema - .fields() - .iter() - .map(|field| { - let dt = field.data_type(); - make_builder(dt, batch_size) - }) - .collect::>() -} - -fn make_batch( - schema: Arc, - mut arrays: Vec>, -) -> ArrowResult { - let columns = arrays.iter_mut().map(|array| array.finish()).collect(); - RecordBatch::try_new(schema, columns) -} diff --git a/datafusion/core/src/row/validity.rs b/datafusion/core/src/row/validity.rs new file mode 100644 index 0000000000000..45f5e19f1894f --- /dev/null +++ b/datafusion/core/src/row/validity.rs @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Row format validity utilities + +use arrow::util::bit_util::get_bit_raw; +use std::fmt::Write; + +const ALL_VALID_MASK: [u8; 8] = [1, 3, 7, 15, 31, 63, 127, 255]; + +/// Returns if all fields are valid +pub fn all_valid(data: &[u8], n: usize) -> bool { + for item in data.iter().take(n / 8) { + if *item != ALL_VALID_MASK[7] { + return false; + } + } + if n % 8 == 0 { + true + } else { + data[n / 8] == ALL_VALID_MASK[n % 8 - 1] + } +} + +/// Show null bit for each field in a tuple, 1 for valid and 0 for null. +/// For a tuple with nine total fields, valid at field 0, 6, 7, 8 shows as `[10000011, 1]`. +pub struct NullBitsFormatter<'a> { + null_bits: &'a [u8], + field_count: usize, +} + +impl<'a> NullBitsFormatter<'a> { + /// new + pub fn new(null_bits: &'a [u8], field_count: usize) -> Self { + Self { + null_bits, + field_count, + } + } +} + +impl<'a> std::fmt::Debug for NullBitsFormatter<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut is_first = true; + let data = self.null_bits; + for i in 0..self.field_count { + if is_first { + f.write_char('[')?; + is_first = false; + } else if i % 8 == 0 { + f.write_str(", ")?; + } + if unsafe { get_bit_raw(data.as_ptr(), i) } { + f.write_char('1')?; + } else { + f.write_char('0')?; + } + } + f.write_char(']')?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::util::bit_util::{ceil, set_bit_raw, unset_bit_raw}; + use rand::Rng; + + fn test_validity(bs: &[bool]) { + let n = bs.len(); + let mut data = vec![0; ceil(n, 8)]; + for (i, b) in bs.iter().enumerate() { + if *b { + let data_argument = &mut data; + unsafe { + set_bit_raw(data_argument.as_mut_ptr(), i); + }; + } else { + let data_argument = &mut data; + unsafe { + unset_bit_raw(data_argument.as_mut_ptr(), i); + }; + } + } + let expected = bs.iter().all(|f| *f); + assert_eq!(all_valid(&data, bs.len()), expected); + } + + #[test] + fn test_all_valid() { + let sizes = [4, 8, 12, 16, 19, 23, 32, 44]; + for i in sizes { + { + // contains false + let input = { + let mut rng = rand::thread_rng(); + let mut input: Vec = vec![false; i]; + rng.fill(&mut input[..]); + input + }; + test_validity(&input); + } + + { + // all true + let input = vec![true; i]; + test_validity(&input); + } + } + } + + #[test] + fn test_formatter() -> std::fmt::Result { + assert_eq!( + format!("{:?}", NullBitsFormatter::new(&[0b11000001], 8)), + "[10000011]" + ); + assert_eq!( + format!("{:?}", NullBitsFormatter::new(&[0b11000001, 1], 9)), + "[10000011, 1]" + ); + assert_eq!(format!("{:?}", NullBitsFormatter::new(&[1], 2)), "[10]"); + assert_eq!(format!("{:?}", NullBitsFormatter::new(&[1], 3)), "[100]"); + assert_eq!(format!("{:?}", NullBitsFormatter::new(&[1], 4)), "[1000]"); + assert_eq!(format!("{:?}", NullBitsFormatter::new(&[1], 5)), "[10000]"); + assert_eq!(format!("{:?}", NullBitsFormatter::new(&[1], 6)), "[100000]"); + assert_eq!( + format!("{:?}", NullBitsFormatter::new(&[1], 7)), + "[1000000]" + ); + assert_eq!( + format!("{:?}", NullBitsFormatter::new(&[1], 8)), + "[10000000]" + ); + // extra bytes are ignored + assert_eq!( + format!("{:?}", NullBitsFormatter::new(&[0b11000001, 1, 1, 1], 9)), + "[10000011, 1]" + ); + assert_eq!( + format!("{:?}", NullBitsFormatter::new(&[0b11000001, 1, 1], 16)), + "[10000011, 10000000]" + ); + Ok(()) + } +} diff --git a/datafusion/core/src/row/writer.rs b/datafusion/core/src/row/writer.rs index 9923ebfb5105b..9cb208d03a219 100644 --- a/datafusion/core/src/row/writer.rs +++ b/datafusion/core/src/row/writer.rs @@ -17,27 +17,13 @@ //! Reusable row writer backed by Vec to stitch attributes together -#[cfg(feature = "jit")] use crate::error::Result; -#[cfg(feature = "jit")] -use crate::reg_fn; -#[cfg(feature = "jit")] -use crate::row::fn_name; -use crate::row::{ - estimate_row_width, fixed_size, get_offsets, schema_null_free, supported, -}; +use crate::row::layout::{estimate_row_width, get_offsets}; +use crate::row::{fixed_size, row_supported, schema_null_free}; use arrow::array::*; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; use arrow::util::bit_util::{ceil, round_upto_power_of_2, set_bit_raw, unset_bit_raw}; -#[cfg(feature = "jit")] -use datafusion_jit::api::CodeBlock; -#[cfg(feature = "jit")] -use datafusion_jit::api::{Assembler, GeneratedFunction}; -#[cfg(feature = "jit")] -use datafusion_jit::ast::Expr; -#[cfg(feature = "jit")] -use datafusion_jit::ast::{BOOL, I64, PTR}; use std::cmp::max; use std::sync::Arc; @@ -67,45 +53,6 @@ pub fn write_batch_unchecked( offsets } -/// Append batch from `row_idx` to `output` buffer start from `offset` -/// # Panics -/// -/// This function will panic if the output buffer doesn't have enough space to hold all the rows -#[cfg(feature = "jit")] -pub fn write_batch_unchecked_jit( - output: &mut [u8], - offset: usize, - batch: &RecordBatch, - row_idx: usize, - schema: Arc, - assembler: &Assembler, -) -> Result> { - let mut writer = RowWriter::new(&schema); - let mut current_offset = offset; - let mut offsets = vec![]; - register_write_functions(assembler)?; - let gen_func = gen_write_row(&schema, assembler)?; - let mut jit = assembler.create_jit(); - let code_ptr = jit.compile(gen_func)?; - - let code_fn = unsafe { - std::mem::transmute::<_, fn(&mut RowWriter, usize, &RecordBatch)>(code_ptr) - }; - - for cur_row in row_idx..batch.num_rows() { - offsets.push(current_offset); - code_fn(&mut writer, cur_row, batch); - writer.end_padding(); - let row_width = writer.row_width; - output[current_offset..current_offset + row_width] - .copy_from_slice(writer.get_row()); - current_offset += row_width; - writer.reset() - } - Ok(offsets) -} - -#[cfg(feature = "jit")] /// bench interpreted version write #[inline(never)] pub fn bench_write_batch( @@ -127,35 +74,6 @@ pub fn bench_write_batch( Ok(lengths) } -#[cfg(feature = "jit")] -/// bench jit version write -#[inline(never)] -pub fn bench_write_batch_jit( - batches: &[Vec], - schema: Arc, -) -> Result> { - let assembler = Assembler::default(); - let mut writer = RowWriter::new(&schema); - let mut lengths = vec![]; - register_write_functions(&assembler)?; - let gen_func = gen_write_row(&schema, &assembler)?; - let mut jit = assembler.create_jit(); - let code_ptr = jit.compile(gen_func)?; - let code_fn = unsafe { - std::mem::transmute::<_, fn(&mut RowWriter, usize, &RecordBatch)>(code_ptr) - }; - - for batch in batches.iter().flatten() { - for cur_row in 0..batch.num_rows() { - code_fn(&mut writer, cur_row, batch); - writer.end_padding(); - lengths.push(writer.row_width); - writer.reset() - } - } - Ok(lengths) -} - macro_rules! set_idx { ($WIDTH: literal, $SELF: ident, $IDX: ident, $VALUE: ident) => {{ $SELF.assert_index_valid($IDX); @@ -183,7 +101,7 @@ pub struct RowWriter { /// Total number of fields for each tuple. field_count: usize, /// Length in bytes for the current tuple, 8-bytes word aligned. - row_width: usize, + pub(crate) row_width: usize, /// The number of bytes used to store null bits for each field. null_width: usize, /// Length in bytes for `values` part of the current tuple. @@ -203,12 +121,12 @@ pub struct RowWriter { impl RowWriter { /// new pub fn new(schema: &Arc) -> Self { - assert!(supported(schema)); + assert!(row_supported(schema)); let null_free = schema_null_free(schema); let field_count = schema.fields().len(); let null_width = if null_free { 0 } else { ceil(field_count, 8) }; let (field_offsets, values_width) = get_offsets(null_width, schema); - let mut init_capacity = estimate_row_width(null_width, schema); + let mut init_capacity = estimate_row_width(schema); if !fixed_size(schema) { // double the capacity to avoid repeated resize init_capacity *= 2; @@ -239,7 +157,7 @@ impl RowWriter { assert!(idx < self.field_count); } - fn set_null_at(&mut self, idx: usize) { + pub(crate) fn set_null_at(&mut self, idx: usize) { assert!( !self.null_free, "Unexpected call to set_null_at on null-free row writer" @@ -250,7 +168,7 @@ impl RowWriter { } } - fn set_non_null_at(&mut self, idx: usize) { + pub(crate) fn set_non_null_at(&mut self, idx: usize) { assert!( !self.null_free, "Unexpected call to set_non_null_at on null-free row writer" @@ -327,7 +245,7 @@ impl RowWriter { } /// End each row at 8-byte word boundary. - fn end_padding(&mut self) { + pub(crate) fn end_padding(&mut self) { let payload_width = self.current_width(); self.row_width = round_upto_power_of_2(payload_width, 8); if self.data.capacity() < self.row_width { @@ -335,13 +253,14 @@ impl RowWriter { } } - fn get_row(&self) -> &[u8] { + /// Get raw bytes + pub fn get_row(&self) -> &[u8] { &self.data[0..self.row_width] } } /// Stitch attributes of tuple in `batch` at `row_idx` and returns the tuple width -fn write_row( +pub fn write_row( row: &mut RowWriter, row_idx: usize, schema: &Arc, @@ -367,126 +286,10 @@ fn write_row( row.row_width } -// we could remove this function wrapper once we find a way to call the trait method directly. -#[cfg(feature = "jit")] -fn is_null(col: &Arc, row_idx: usize) -> bool { - col.is_null(row_idx) -} - -#[cfg(feature = "jit")] -fn register_write_functions(asm: &Assembler) -> Result<()> { - let reader_param = vec![PTR, I64, PTR]; - reg_fn!(asm, RecordBatch::column, vec![PTR, I64], Some(PTR)); - reg_fn!(asm, RowWriter::set_null_at, vec![PTR, I64], None); - reg_fn!(asm, RowWriter::set_non_null_at, vec![PTR, I64], None); - reg_fn!(asm, is_null, vec![PTR, I64], Some(BOOL)); - reg_fn!(asm, write_field_bool, reader_param.clone(), None); - reg_fn!(asm, write_field_u8, reader_param.clone(), None); - reg_fn!(asm, write_field_u16, reader_param.clone(), None); - reg_fn!(asm, write_field_u32, reader_param.clone(), None); - reg_fn!(asm, write_field_u64, reader_param.clone(), None); - reg_fn!(asm, write_field_i8, reader_param.clone(), None); - reg_fn!(asm, write_field_i16, reader_param.clone(), None); - reg_fn!(asm, write_field_i32, reader_param.clone(), None); - reg_fn!(asm, write_field_i64, reader_param.clone(), None); - reg_fn!(asm, write_field_f32, reader_param.clone(), None); - reg_fn!(asm, write_field_f64, reader_param.clone(), None); - reg_fn!(asm, write_field_date32, reader_param.clone(), None); - reg_fn!(asm, write_field_date64, reader_param.clone(), None); - reg_fn!(asm, write_field_utf8, reader_param.clone(), None); - reg_fn!(asm, write_field_binary, reader_param, None); - Ok(()) -} - -#[cfg(feature = "jit")] -fn gen_write_row( - schema: &Arc, - assembler: &Assembler, -) -> Result { - let mut builder = assembler - .new_func_builder("write_row") - .param("row", PTR) - .param("row_idx", I64) - .param("batch", PTR); - let null_free = schema_null_free(schema); - let mut b = builder.enter_block(); - for (i, f) in schema.fields().iter().enumerate() { - let dt = f.data_type(); - let arr = format!("a{}", i); - b.declare_as( - &arr, - b.call("column", vec![b.id("batch")?, b.lit_i(i as i64)])?, - )?; - if f.is_nullable() { - b.if_block( - |c| c.call("is_null", vec![c.id(&arr)?, c.id("row_idx")?]), - |t| { - t.call_stmt("set_null_at", vec![t.id("row")?, t.lit_i(i as i64)])?; - Ok(()) - }, - |e| { - e.call_stmt( - "set_non_null_at", - vec![e.id("row")?, e.lit_i(i as i64)], - )?; - let params = vec![ - e.id("row")?, - e.id(&arr)?, - e.lit_i(i as i64), - e.id("row_idx")?, - ]; - write_typed_field_stmt(dt, e, params)?; - Ok(()) - }, - )?; - } else { - if !null_free { - b.call_stmt("set_non_null_at", vec![b.id("row")?, b.lit_i(i as i64)])?; - } - let params = vec![ - b.id("row")?, - b.id(&arr)?, - b.lit_i(i as i64), - b.id("row_idx")?, - ]; - write_typed_field_stmt(dt, &mut b, params)?; - } - } - Ok(b.build()) -} - -#[cfg(feature = "jit")] -fn write_typed_field_stmt<'a>( - dt: &DataType, - b: &mut CodeBlock<'a>, - params: Vec, -) -> Result<()> { - use DataType::*; - match dt { - Boolean => b.call_stmt("write_field_bool", params)?, - UInt8 => b.call_stmt("write_field_u8", params)?, - UInt16 => b.call_stmt("write_field_u16", params)?, - UInt32 => b.call_stmt("write_field_u32", params)?, - UInt64 => b.call_stmt("write_field_u64", params)?, - Int8 => b.call_stmt("write_field_i8", params)?, - Int16 => b.call_stmt("write_field_i16", params)?, - Int32 => b.call_stmt("write_field_i32", params)?, - Int64 => b.call_stmt("write_field_i64", params)?, - Float32 => b.call_stmt("write_field_f32", params)?, - Float64 => b.call_stmt("write_field_f64", params)?, - Date32 => b.call_stmt("write_field_date32", params)?, - Date64 => b.call_stmt("write_field_date64", params)?, - Utf8 => b.call_stmt("write_field_utf8", params)?, - Binary => b.call_stmt("write_field_binary", params)?, - _ => unimplemented!(), - } - Ok(()) -} - macro_rules! fn_write_field { ($NATIVE: ident, $ARRAY: ident) => { paste::item! { - fn [](to: &mut RowWriter, from: &Arc, col_idx: usize, row_idx: usize) { + pub(crate) fn [](to: &mut RowWriter, from: &Arc, col_idx: usize, row_idx: usize) { let from = from .as_any() .downcast_ref::<$ARRAY>() @@ -509,7 +312,7 @@ fn_write_field!(i64, Int64Array); fn_write_field!(f32, Float32Array); fn_write_field!(f64, Float64Array); -fn write_field_date32( +pub(crate) fn write_field_date32( to: &mut RowWriter, from: &Arc, col_idx: usize, @@ -519,7 +322,7 @@ fn write_field_date32( to.set_date32(col_idx, from.value(row_idx)); } -fn write_field_date64( +pub(crate) fn write_field_date64( to: &mut RowWriter, from: &Arc, col_idx: usize, @@ -529,7 +332,7 @@ fn write_field_date64( to.set_date64(col_idx, from.value(row_idx)); } -fn write_field_utf8( +pub(crate) fn write_field_utf8( to: &mut RowWriter, from: &Arc, col_idx: usize, @@ -545,7 +348,7 @@ fn write_field_utf8( to.set_utf8(col_idx, s); } -fn write_field_binary( +pub(crate) fn write_field_binary( to: &mut RowWriter, from: &Arc, col_idx: usize,