Skip to content

Commit

Permalink
Support EXPLAIN COPY
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Aug 16, 2023
1 parent 518efca commit 108b78e
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 40 deletions.
52 changes: 32 additions & 20 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1698,30 +1698,42 @@ impl SessionState {
}

let mut visitor = RelationVisitor(&mut relations);
match statement {
DFStatement::Statement(s) => {
let _ = s.as_ref().visit(&mut visitor);
}
DFStatement::CreateExternalTable(table) => {
visitor
.0
.insert(ObjectName(vec![Ident::from(table.name.as_str())]));
}
DFStatement::DescribeTableStmt(table) => visitor.insert(&table.table_name),
DFStatement::CopyTo(CopyToStatement {
source,
target: _,
options: _,
}) => match source {
CopyToSource::Relation(table_name) => {
visitor.insert(table_name);
fn visit_statement<'a>(
statement: &DFStatement,
visitor: &mut RelationVisitor<'a>,
) {
match statement {
DFStatement::Statement(s) => {
let _ = s.as_ref().visit(visitor);
}
CopyToSource::Query(query) => {
query.visit(&mut visitor);
DFStatement::CreateExternalTable(table) => {
visitor
.0
.insert(ObjectName(vec![Ident::from(table.name.as_str())]));
}
},
DFStatement::DescribeTableStmt(table) => {
visitor.insert(&table.table_name)
}
DFStatement::CopyTo(CopyToStatement {
source,
target: _,
options: _,
}) => match source {
CopyToSource::Relation(table_name) => {
visitor.insert(table_name);
}
CopyToSource::Query(query) => {
query.visit(visitor);
}
},
DFStatement::Explain(explain) => {
visit_statement(&explain.statement, visitor)
}
}
}

visit_statement(statement, &mut visitor);

// Always include information_schema if available
if self.config.information_schema() {
for s in INFORMATION_SCHEMA_TABLES {
Expand Down
91 changes: 81 additions & 10 deletions datafusion/sql/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,35 @@ fn parse_file_type(s: &str) -> Result<String, ParserError> {
Ok(s.to_uppercase())
}

/// DataFusion specific EXPLAIN (needed so we can EXPLAIN datafusion
/// specific COPY and other statements)
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ExplainStatement {
pub analyze: bool,
pub verbose: bool,
pub statement: Box<Statement>,
}

impl fmt::Display for ExplainStatement {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
analyze,
verbose,
statement,
} = self;

write!(f, "EXPLAIN ")?;
if *analyze {
write!(f, "ANALYZE ")?;
}
if *verbose {
write!(f, "VERBOSE ")?;
}

write!(f, "{statement}")
}
}

/// DataFusion extension DDL for `COPY`
///
/// # Syntax:
Expand Down Expand Up @@ -204,6 +233,8 @@ pub enum Statement {
DescribeTableStmt(DescribeTableStmt),
/// Extension: `COPY TO`
CopyTo(CopyToStatement),
/// EXPLAIN for extensions
Explain(ExplainStatement),
}

impl fmt::Display for Statement {
Expand All @@ -213,11 +244,12 @@ impl fmt::Display for Statement {
Statement::CreateExternalTable(stmt) => write!(f, "{stmt}"),
Statement::DescribeTableStmt(_) => write!(f, "DESCRIBE TABLE ..."),
Statement::CopyTo(stmt) => write!(f, "{stmt}"),
Statement::Explain(stmt) => write!(f, "{stmt}"),
}
}
}

/// DataFusion SQL Parser based on [`sqlparser`]
/// Datafusion1 SQL Parser based on [`sqlparser`]
///
/// This parser handles DataFusion specific statements, delegating to
/// [`Parser`](sqlparser::parser::Parser) for other SQL statements.
Expand Down Expand Up @@ -298,24 +330,24 @@ impl<'a> DFParser<'a> {
Token::Word(w) => {
match w.keyword {
Keyword::CREATE => {
// move one token forward
self.parser.next_token();
// use custom parsing
self.parser.next_token(); // CREATE
self.parse_create()
}
Keyword::COPY => {
// move one token forward
self.parser.next_token();
self.parser.next_token(); // COPY
self.parse_copy()
}
Keyword::DESCRIBE => {
// move one token forward
self.parser.next_token();
// use custom parsing
self.parser.next_token(); // DESCRIBE
self.parse_describe()
}
Keyword::EXPLAIN => {
// (TODO parse all supported statements)
self.parser.next_token(); // EXPLAIN
self.parse_explain()
}
_ => {
// use the native parser
// use sqlparser-rs parser
Ok(Statement::Statement(Box::from(
self.parser.parse_statement()?,
)))
Expand Down Expand Up @@ -412,6 +444,19 @@ impl<'a> DFParser<'a> {
}
}

/// Parse a SQL `EXPLAIN`
pub fn parse_explain(&mut self) -> Result<Statement, ParserError> {
let analyze = self.parser.parse_keyword(Keyword::ANALYZE);
let verbose = self.parser.parse_keyword(Keyword::VERBOSE);
let statement = self.parse_statement()?;

Ok(Statement::Explain(ExplainStatement {
statement: Box::new(statement),
analyze,
verbose,
}))
}

/// Parse a SQL `CREATE` statement handling `CREATE EXTERNAL TABLE`
pub fn parse_create(&mut self) -> Result<Statement, ParserError> {
if self.parser.parse_keyword(Keyword::EXTERNAL) {
Expand Down Expand Up @@ -1283,6 +1328,32 @@ mod tests {
Ok(())
}

#[test]
fn explain_copy_to_table_to_table() -> Result<(), ParserError> {
let cases = vec![
("EXPLAIN COPY foo TO bar", false, false),
("EXPLAIN ANALYZE COPY foo TO bar", true, false),
("EXPLAIN VERBOSE COPY foo TO bar", false, true),
("EXPLAIN ANALYZE VERBOSE COPY foo TO bar", true, true),
];
for (sql, analyze, verbose) in cases {
println!("sql: {sql}, analyze: {analyze}, verbose: {verbose}");

let expected_copy = Statement::CopyTo(CopyToStatement {
source: object_name("foo"),
target: "bar".to_string(),
options: HashMap::new(),
});
let expected = Statement::Explain(ExplainStatement {
analyze,
verbose,
statement: Box::new(expected_copy),
});
assert_eq!(verified_stmt(sql), expected);
}
Ok(())
}

#[test]
fn copy_to_query_to_table() -> Result<(), ParserError> {
let statement = verified_stmt("SELECT 1");
Expand Down
22 changes: 17 additions & 5 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use crate::parser::{
CopyToSource, CopyToStatement, CreateExternalTable, DFParser, DescribeTableStmt,
LexOrdering, Statement as DFStatement,
ExplainStatement, LexOrdering, Statement as DFStatement,
};
use crate::planner::{
object_name_to_qualifier, ContextProvider, PlannerContext, SqlToRel,
Expand Down Expand Up @@ -93,6 +93,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
DFStatement::Statement(s) => self.sql_statement_to_plan(*s),
DFStatement::DescribeTableStmt(s) => self.describe_table_to_plan(s),
DFStatement::CopyTo(s) => self.copy_to_plan(s),
DFStatement::Explain(ExplainStatement {
verbose,
analyze,
statement,
}) => self.explain_to_plan(verbose, analyze, *statement),
}
}

Expand All @@ -116,7 +121,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
format: _,
describe_alias: _,
..
} => self.explain_statement_to_plan(verbose, analyze, *statement),
} => {
self.explain_to_plan(verbose, analyze, DFStatement::Statement(statement))
}
Statement::Query(query) => self.query_to_plan(*query, planner_context),
Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable),
Statement::SetVariable {
Expand Down Expand Up @@ -706,13 +713,18 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

/// Generate a plan for EXPLAIN ... that will print out a plan
///
fn explain_statement_to_plan(
/// Note this is the sqlparser explain statement, not the
/// datafusion `EXPLAIN` statement.
fn explain_to_plan(
&self,
verbose: bool,
analyze: bool,
statement: Statement,
statement: DFStatement,
) -> Result<LogicalPlan> {
let plan = self.sql_statement_to_plan(statement)?;
let plan = self.statement_to_plan(statement)?;
if matches!(plan, LogicalPlan::Explain(_)) {
return plan_err!("Nested explain not supported");
}
let plan = Arc::new(plan);
let schema = LogicalPlan::explain_schema();
let schema = schema.to_dfschema_ref()?;
Expand Down
12 changes: 12 additions & 0 deletions datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,18 @@ CopyTo: format=csv output_url=output.csv per_thread_output=false options: ()
quick_test(sql, plan);
}

#[test]
fn plan_explain_copy_to() {
let sql = "EXPLAIN COPY test_decimal to 'output.csv'";
let plan = r#"
Explain
CopyTo: format=csv output_url=output.csv per_thread_output=false options: ()
TableScan: test_decimal
"#
.trim();
quick_test(sql, plan);
}

#[test]
fn plan_copy_to_query() {
let sql = "COPY (select * from test_decimal limit 10) to 'output.csv'";
Expand Down
13 changes: 10 additions & 3 deletions datafusion/sqllogictest/test_files/copy.slt
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,19 @@ COPY source_table TO 'test_files/scratch/table' (format parquet, per_thread_outp
----
2

#Explain copy queries not currently working
query error DataFusion error: This feature is not implemented: Unsupported SQL statement: Some\("COPY source_table TO 'test_files/scratch/table'"\)
# Error case
query error DataFusion error: Error during planning: Copy To format not explicitly set and unable to get file extension!
EXPLAIN COPY source_table to 'test_files/scratch/table'

query error DataFusion error: SQL error: ParserError\("Expected end of statement, found: source_table"\)
query TT
EXPLAIN COPY source_table to 'test_files/scratch/table' (format parquet, per_thread_output true)
----
logical_plan
CopyTo: format=parquet output_url=test_files/scratch/table per_thread_output=true options: (,per_thread_output true,format parquet)
--TableScan: source_table projection=[col1, col2]
physical_plan
InsertExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[])
--MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]

# Copy more files to directory via query
query IT
Expand Down
8 changes: 6 additions & 2 deletions datafusion/sqllogictest/test_files/explain.slt
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,17 @@ set datafusion.explain.physical_plan_only = false


## explain nested
statement error Explain must be root of the plan
query error DataFusion error: Error during planning: Nested explain not supported
EXPLAIN explain select 1

## explain nested
statement error DataFusion error: Error during planning: Nested explain not supported
EXPLAIN EXPLAIN explain select 1

statement ok
set datafusion.explain.physical_plan_only = true

statement error Explain must be root of the plan
statement error DataFusion error: Error during planning: Nested explain not supported
EXPLAIN explain select 1

statement ok
Expand Down

0 comments on commit 108b78e

Please sign in to comment.