From 70f01042caaf97ff36dffe61cb13eac2906905e8 Mon Sep 17 00:00:00 2001 From: jonahgao Date: Mon, 29 Apr 2024 22:48:09 +0800 Subject: [PATCH 1/3] feat(CLI): print column headers for empty query results --- datafusion-cli/src/command.rs | 16 ++-- datafusion-cli/src/exec.rs | 5 +- datafusion-cli/src/print_format.rs | 130 ++++++++++++++++++++++------ datafusion-cli/src/print_options.rs | 7 +- 4 files changed, 121 insertions(+), 37 deletions(-) diff --git a/datafusion-cli/src/command.rs b/datafusion-cli/src/command.rs index d3d7b65f0a509..d65b107f87171 100644 --- a/datafusion-cli/src/command.rs +++ b/datafusion-cli/src/command.rs @@ -17,7 +17,7 @@ //! Command within CLI -use crate::exec::exec_from_lines; +use crate::exec::{exec_and_print, exec_from_lines}; use crate::functions::{display_all_functions, Function}; use crate::print_format::PrintFormat; use crate::print_options::PrintOptions; @@ -60,16 +60,16 @@ impl Command { ) -> Result<()> { let now = Instant::now(); match self { - Self::Help => print_options.print_batches(&[all_commands_info()], now), + Self::Help => { + let command_batch = all_commands_info(); + print_options.print_batches(command_batch.schema(), &[command_batch], now) + } Self::ListTables => { - let df = ctx.sql("SHOW TABLES").await?; - let batches = df.collect().await?; - print_options.print_batches(&batches, now) + exec_and_print(ctx, print_options, "SHOW TABLES".into()).await } Self::DescribeTableStmt(name) => { - let df = ctx.sql(&format!("SHOW COLUMNS FROM {}", name)).await?; - let batches = df.collect().await?; - print_options.print_batches(&batches, now) + exec_and_print(ctx, print_options, format!("SHOW COLUMNS FROM {}", name)) + .await } Self::Include(filename) => { if let Some(filename) = filename { diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index 5fbcea0c06831..19bff0528b778 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -203,7 +203,7 @@ pub async fn exec_from_repl( rl.save_history(".history") } -async fn exec_and_print( +pub(super) async fn exec_and_print( ctx: &mut SessionContext, print_options: &PrintOptions, sql: String, @@ -235,8 +235,9 @@ async fn exec_and_print( let stream = execute_stream(physical_plan, task_ctx.clone())?; print_options.print_stream(stream, now).await?; } else { + let schema = physical_plan.schema(); let results = collect(physical_plan, task_ctx.clone()).await?; - adjusted.into_inner().print_batches(&results, now)?; + adjusted.into_inner().print_batches(schema, &results, now)?; } } diff --git a/datafusion-cli/src/print_format.rs b/datafusion-cli/src/print_format.rs index 2de52be612bb5..c95bde7fc6c71 100644 --- a/datafusion-cli/src/print_format.rs +++ b/datafusion-cli/src/print_format.rs @@ -22,6 +22,7 @@ use std::str::FromStr; use crate::print_options::MaxRows; use arrow::csv::writer::WriterBuilder; +use arrow::datatypes::SchemaRef; use arrow::json::{ArrayWriter, LineDelimitedWriter}; use arrow::record_batch::RecordBatch; use arrow::util::pretty::pretty_format_batches_with_options; @@ -157,6 +158,7 @@ impl PrintFormat { pub fn print_batches( &self, writer: &mut W, + schema: SchemaRef, batches: &[RecordBatch], maxrows: MaxRows, with_header: bool, @@ -168,7 +170,7 @@ impl PrintFormat { .cloned() .collect(); if batches.is_empty() { - return Ok(()); + return self.print_empty(writer, schema); } match self { @@ -186,6 +188,27 @@ impl PrintFormat { Self::NdJson => batches_to_json!(LineDelimitedWriter, writer, &batches), } } + + /// Print when the result batches contain no rows + fn print_empty( + &self, + writer: &mut W, + schema: SchemaRef, + ) -> Result<()> { + match self { + // Print column headers for Table format + Self::Table if !schema.fields().is_empty() => { + let empty_batch = RecordBatch::new_empty(schema); + let formatted = pretty_format_batches_with_options( + &[empty_batch], + &DEFAULT_FORMAT_OPTIONS, + )?; + writeln!(writer, "{}", formatted)?; + } + _ => {} + } + Ok(()) + } } #[cfg(test)] @@ -193,7 +216,7 @@ mod tests { use super::*; use std::sync::Arc; - use arrow::array::{ArrayRef, Int32Array}; + use arrow::array::Int32Array; use arrow::datatypes::{DataType, Field, Schema}; #[test] @@ -201,7 +224,6 @@ mod tests { for format in [ PrintFormat::Csv, PrintFormat::Tsv, - PrintFormat::Table, PrintFormat::Json, PrintFormat::NdJson, PrintFormat::Automatic, @@ -209,10 +231,26 @@ mod tests { // no output for empty batches, even with header set PrintBatchesTest::new() .with_format(format) + .with_schema(three_column_schema()) .with_batches(vec![]) .with_expected(&[""]) .run(); } + + // output column headers for empty batches when format is Table + #[rustfmt::skip] + let expected = &[ + "+---+---+---+", + "| a | b | c |", + "+---+---+---+", + "+---+---+---+", + ]; + PrintBatchesTest::new() + .with_format(PrintFormat::Table) + .with_schema(three_column_schema()) + .with_batches(vec![]) + .with_expected(expected) + .run(); } #[test] @@ -385,6 +423,7 @@ mod tests { for max_rows in [MaxRows::Unlimited, MaxRows::Limited(5), MaxRows::Limited(3)] { PrintBatchesTest::new() .with_format(PrintFormat::Table) + .with_schema(one_column_schema()) .with_batches(vec![one_column_batch()]) .with_maxrows(max_rows) .with_expected(expected) @@ -450,15 +489,15 @@ mod tests { let empty_batch = RecordBatch::new_empty(batch.schema()); #[rustfmt::skip] - let expected =&[ - "+---+", - "| a |", - "+---+", - "| 1 |", - "| 2 |", - "| 3 |", - "+---+", - ]; + let expected =&[ + "+---+", + "| a |", + "+---+", + "| 1 |", + "| 2 |", + "| 3 |", + "+---+", + ]; PrintBatchesTest::new() .with_format(PrintFormat::Table) @@ -468,14 +507,32 @@ mod tests { } #[test] - fn test_print_batches_empty_batches_no_header() { + fn test_print_batches_empty_batch() { let empty_batch = RecordBatch::new_empty(one_column_batch().schema()); - // empty batches should not print a header - let expected = &[""]; + // Print column headers for empty batch when format is Table + #[rustfmt::skip] + let expected =&[ + "+---+", + "| a |", + "+---+", + "+---+", + ]; + + PrintBatchesTest::new() + .with_format(PrintFormat::Table) + .with_schema(one_column_schema()) + .with_batches(vec![empty_batch]) + .with_header(WithHeader::Yes) + .with_expected(expected) + .run(); + // No output for empty batch when schema contains no columns + let empty_batch = RecordBatch::new_empty(Arc::new(Schema::empty())); + let expected = &[""]; PrintBatchesTest::new() .with_format(PrintFormat::Table) + .with_schema(Arc::new(Schema::empty())) .with_batches(vec![empty_batch]) .with_header(WithHeader::Yes) .with_expected(expected) @@ -485,6 +542,7 @@ mod tests { #[derive(Debug)] struct PrintBatchesTest { format: PrintFormat, + schema: SchemaRef, batches: Vec, maxrows: MaxRows, with_header: WithHeader, @@ -504,6 +562,7 @@ mod tests { fn new() -> Self { Self { format: PrintFormat::Table, + schema: Arc::new(Schema::empty()), batches: vec![], maxrows: MaxRows::Unlimited, with_header: WithHeader::Ignored, @@ -517,6 +576,12 @@ mod tests { self } + // set the schema + fn with_schema(mut self, schema: SchemaRef) -> Self { + self.schema = schema; + self + } + /// set the batches to convert fn with_batches(mut self, batches: Vec) -> Self { self.batches = batches; @@ -573,21 +638,31 @@ mod tests { fn output_with_header(&self, with_header: bool) -> String { let mut buffer: Vec = vec![]; self.format - .print_batches(&mut buffer, &self.batches, self.maxrows, with_header) + .print_batches( + &mut buffer, + self.schema.clone(), + &self.batches, + self.maxrows, + with_header, + ) .unwrap(); String::from_utf8(buffer).unwrap() } } - /// Return a batch with three columns and three rows - fn three_column_batch() -> RecordBatch { - let schema = Arc::new(Schema::new(vec![ + /// Return a schema with three columns + fn three_column_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ Field::new("a", DataType::Int32, false), Field::new("b", DataType::Int32, false), Field::new("c", DataType::Int32, false), - ])); + ])) + } + + /// Return a batch with three columns and three rows + fn three_column_batch() -> RecordBatch { RecordBatch::try_new( - schema, + three_column_schema(), vec![ Arc::new(Int32Array::from(vec![1, 2, 3])), Arc::new(Int32Array::from(vec![4, 5, 6])), @@ -597,12 +672,17 @@ mod tests { .unwrap() } + /// Return a schema with one column + fn one_column_schema() -> SchemaRef { + Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])) + } + /// return a batch with one column and three rows fn one_column_batch() -> RecordBatch { - RecordBatch::try_from_iter(vec![( - "a", - Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, - )]) + RecordBatch::try_new( + one_column_schema(), + vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], + ) .unwrap() } diff --git a/datafusion-cli/src/print_options.rs b/datafusion-cli/src/print_options.rs index bede5dd15eb66..e80cc55663ae2 100644 --- a/datafusion-cli/src/print_options.rs +++ b/datafusion-cli/src/print_options.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use datafusion::common::instant::Instant; use std::fmt::{Display, Formatter}; use std::io::Write; use std::pin::Pin; @@ -23,7 +22,9 @@ use std::str::FromStr; use crate::print_format::PrintFormat; +use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; +use datafusion::common::instant::Instant; use datafusion::common::DataFusionError; use datafusion::error::Result; use datafusion::physical_plan::RecordBatchStream; @@ -98,6 +99,7 @@ impl PrintOptions { /// Print the batches to stdout using the specified format pub fn print_batches( &self, + schema: SchemaRef, batches: &[RecordBatch], query_start_time: Instant, ) -> Result<()> { @@ -105,7 +107,7 @@ impl PrintOptions { let mut writer = stdout.lock(); self.format - .print_batches(&mut writer, batches, self.maxrows, true)?; + .print_batches(&mut writer, schema, batches, self.maxrows, true)?; let row_count: usize = batches.iter().map(|b| b.num_rows()).sum(); let formatted_exec_details = get_execution_details_formatted( @@ -148,6 +150,7 @@ impl PrintOptions { row_count += batch.num_rows(); self.format.print_batches( &mut writer, + batch.schema(), &[batch], MaxRows::Unlimited, with_header, From bd4b297fd1a2ba8e0406f468233cd608435caef8 Mon Sep 17 00:00:00 2001 From: jonahgao Date: Mon, 29 Apr 2024 23:16:14 +0800 Subject: [PATCH 2/3] Narrow now()'s scope --- datafusion-cli/src/command.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion-cli/src/command.rs b/datafusion-cli/src/command.rs index d65b107f87171..be6393351aed4 100644 --- a/datafusion-cli/src/command.rs +++ b/datafusion-cli/src/command.rs @@ -58,9 +58,9 @@ impl Command { ctx: &mut SessionContext, print_options: &mut PrintOptions, ) -> Result<()> { - let now = Instant::now(); match self { Self::Help => { + let now = Instant::now(); let command_batch = all_commands_info(); print_options.print_batches(command_batch.schema(), &[command_batch], now) } From 2ce4e19a3360f8deb26d12a527bd7826ed5cf30b Mon Sep 17 00:00:00 2001 From: jonahgao Date: Mon, 29 Apr 2024 23:18:49 +0800 Subject: [PATCH 3/3] retry ci