Skip to content

Commit

Permalink
feat: table support for excel
Browse files Browse the repository at this point in the history
  • Loading branch information
tychoish committed Jul 26, 2024
1 parent cff3020 commit 43be262
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 10 deletions.
53 changes: 46 additions & 7 deletions crates/datasources/src/excel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,36 @@ impl ExcelTable {
store_access: Arc<dyn ObjStoreAccess>,
source_url: DatasourceUrl,
sheet_name: Option<String>,
table_name: Option<String>,
has_header: bool,
) -> Result<ExcelTable, ExcelError> {
match source_url {
DatasourceUrl::File(path) => {
let path = ioutil::resolve_path(&path)?;
let mut sheet = calamine::open_workbook_auto(path)?;
let mut sheets = calamine::open_workbook_auto(path)?;

let first_sheet = sheet_name.unwrap_or_else(|| {
let sheets = sheet.sheet_names();
sheets.first().expect("file has a sheet").to_owned()
let names = sheets.sheet_names();
names.first().expect("file has a sheet").to_owned()
});

Ok(ExcelTable {
cell_range: sheet.worksheet_range(&first_sheet)?,
cell_range: match &table_name {
Some(name) => {
match sheets {
calamine::Sheets::Xlsx(mut sheet) => {
sheet.load_tables()?;

sheet.table_by_name(name)?.data().to_owned()
}
_ => return Err(ExcelError::Load(
"specifying excel table is only compatible with xslx workbooks"
.to_string(),
)),
}
}
None => sheets.worksheet_range(&first_sheet)?,
},
has_header,
})
}
Expand All @@ -72,7 +88,8 @@ impl ExcelTable {

let store = accessor.into_object_store();

excel_table_from_object(store.as_ref(), meta, sheet_name, has_header).await
excel_table_from_object(store.as_ref(), meta, sheet_name, table_name, has_header)
.await
}
}
}
Expand All @@ -82,20 +99,41 @@ pub async fn excel_table_from_object(
store: &dyn ObjectStore,
meta: ObjectMeta,
sheet_name: Option<String>,
table_name: Option<String>,
has_header: bool,
) -> Result<ExcelTable, ExcelError> {
let bs = store.get(&meta.location).await?.bytes().await?;

let buffer = Cursor::new(bs);
let mut sheets: Sheets<_> = calamine::open_workbook_auto_from_rs(buffer)?;
let mut sheets: Sheets<_> = match table_name {
Some(_) => Sheets::Xlsx(calamine::open_workbook_from_rs::<calamine::Xlsx<_>, _>(
buffer,
)?),
None => calamine::open_workbook_auto_from_rs(buffer)?,
};

let first_sheet = sheet_name.unwrap_or_else(|| {
let sheets = sheets.sheet_names();
sheets.first().expect("file has a sheet").to_owned()
});


Ok(ExcelTable {
cell_range: sheets.worksheet_range(&first_sheet)?,
cell_range: match &table_name {
Some(name) => match sheets {
calamine::Sheets::Xlsx(mut sheet) => {
sheet.load_tables()?;

sheet.table_by_name(name)?.data().to_owned()
}
_ => {
return Err(ExcelError::Load(
"specifying excel table is only compatible with xslx workbooks".to_string(),
))
}
},
None => sheets.worksheet_range(&first_sheet)?,
},
has_header,
})
}
Expand All @@ -108,6 +146,7 @@ fn xlsx_sheet_value_to_record_batch(
infer_schema_length: usize,
) -> Result<RecordBatch, ExcelError> {
let schema = infer_schema(&r, has_header, infer_schema_length)?;

let arrays = schema
.fields()
.iter()
Expand Down
1 change: 1 addition & 0 deletions crates/protogen/proto/metastore/options.proto
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ message TableOptionsExcel {
optional string compression = 4;
optional string sheet_name = 5;
optional bool has_header = 6;
optional string table_name = 7;
}

message TableOptionsSnowflake {
Expand Down
3 changes: 3 additions & 0 deletions crates/protogen/src/metastore/types/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1385,6 +1385,7 @@ pub struct TableOptionsExcel {
pub file_type: Option<String>,
pub compression: Option<String>,
pub sheet_name: Option<String>,
pub table_name: Option<String>,
pub has_header: Option<bool>,
}

Expand All @@ -1408,6 +1409,7 @@ impl TryFrom<options::TableOptionsExcel> for TableOptionsExcel {
compression: value.compression,
sheet_name: value.sheet_name,
has_header: value.has_header,
table_name: value.table_name,
})
}
}
Expand All @@ -1421,6 +1423,7 @@ impl From<TableOptionsExcel> for options::TableOptionsExcel {
compression: value.compression,
sheet_name: value.sheet_name,
has_header: value.has_header,
table_name: value.table_name,
}
}
}
Expand Down
11 changes: 8 additions & 3 deletions crates/sqlbuiltins/src/functions/table/excel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ pub struct ExcelScan;

impl ConstBuiltinFunction for ExcelScan {
const NAME: &'static str = "read_excel";
const DESCRIPTION: &'static str = "Reads an Excel file from the local filesystem";
const DESCRIPTION: &'static str = "Reads an Excel file.";
const EXAMPLE: &'static str =
"SELECT * FROM read_excel('file:///path/to/file.xlsx', sheet_name => 'Sheet1')";
"SELECT * FROM read_excel('file:///path/to/file.xlsx', sheet_name => 'Sheet1', table_name => 'table')";
const FUNCTION_TYPE: FunctionType = FunctionType::TableReturning;
const ALIASES: &'static [&'static str] = &["read_xlsx"];

Expand Down Expand Up @@ -99,7 +99,12 @@ impl TableFunc for ExcelScan {
.transpose()?
.unwrap_or(100);

let table = ExcelTable::open(store_access, source_url, sheet_name, has_header)
let table_name: Option<String> = opts
.remove("table_name")
.map(FuncParamValue::try_into)
.transpose()?;

let table = ExcelTable::open(store_access, source_url, sheet_name, table_name, has_header)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;

Expand Down
1 change: 1 addition & 0 deletions crates/sqlbuiltins/src/functions/table/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,7 @@ impl TableFunc for CloudUpload {
storage.store.inner.as_ref(),
meta,
/* sheet_name = */ None,
/* table_name = */ None,
/* has_header = */ true,
)
.await?;
Expand Down
2 changes: 2 additions & 0 deletions crates/sqlexec/src/dispatch/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ impl<'a> ExternalDispatcher<'a> {
storage_options,
has_header,
sheet_name,
table_name,
..
}) => {
let source_url = DatasourceUrl::try_new(location)?;
Expand All @@ -454,6 +455,7 @@ impl<'a> ExternalDispatcher<'a> {
store_access,
source_url,
sheet_name.to_owned(),
table_name.to_owned(),
has_header.unwrap_or(true),
)
.await?;
Expand Down
6 changes: 6 additions & 0 deletions crates/sqlexec/src/planner/session_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,11 @@ impl<'a> SessionPlanner<'a> {
.get("sheet_name")
.map(|val| val.to_owned());

let table_name = storage_options
.inner
.get("sheet_name")
.map(|val| val.to_owned());

let has_header = storage_options
.inner
.get("has_header")
Expand All @@ -985,6 +990,7 @@ impl<'a> SessionPlanner<'a> {
file_type: None,
compression: None,
sheet_name,
table_name,
has_header,
}
.into()
Expand Down

0 comments on commit 43be262

Please sign in to comment.