diff --git a/src/lib.rs b/src/lib.rs index fa644b4..100c80b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,8 @@ use pgrx::{prelude::*, GucContext, GucFlags, GucRegistry}; mod arrow_parquet; mod parquet_copy_hook; mod parquet_udfs; +#[cfg(any(test, feature = "pg_test"))] +mod pgrx_tests; mod pgrx_utils; mod type_compat; @@ -34,4073 +36,6 @@ pub extern "C" fn _PG_init() { init_parquet_copy_hook(); } -#[cfg(any(test, feature = "pg_test"))] -#[pg_schema] -mod tests { - use std::fs::File; - use std::io::Write; - use std::marker::PhantomData; - use std::sync::Arc; - use std::vec; - use std::{collections::HashMap, fmt::Debug}; - - use crate::arrow_parquet::compression::PgParquetCompression; - use crate::type_compat::fallback_to_text::FallbackToText; - use crate::type_compat::geometry::Geometry; - use crate::type_compat::map::Map; - use crate::type_compat::pg_arrow_type_conversions::{ - date_to_i32, time_to_i64, timestamp_to_i64, timestamptz_to_i64, timetz_to_i64, - DEFAULT_UNBOUNDED_NUMERIC_PRECISION, DEFAULT_UNBOUNDED_NUMERIC_SCALE, - }; - use arrow::array::{ - ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, - Float64Array, Int16Array, Int32Array, Int8Array, LargeBinaryArray, LargeStringArray, - ListArray, MapArray, RecordBatch, StringArray, StructArray, Time64MicrosecondArray, - TimestampMicrosecondArray, UInt16Array, UInt32Array, UInt64Array, - }; - use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer}; - use arrow::datatypes::UInt16Type; - use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit}; - use parquet::arrow::ArrowWriter; - use pgrx::pg_sys::Oid; - use pgrx::{ - composite_type, - datum::{Date, Time, TimeWithTimeZone, Timestamp, TimestampWithTimeZone}, - pg_test, AnyNumeric, FromDatum, IntoDatum, Spi, - }; - enum CopyOptionValue { - StringOption(String), - IntOption(i64), - } - - fn comma_separated_copy_options(options: &HashMap) -> String { - let mut comma_sepated_options = String::new(); - - for (option_idx, (key, value)) in options.iter().enumerate() { - match value { - CopyOptionValue::StringOption(value) => { - comma_sepated_options.push_str(&format!("{} '{}'", key, value)); - } - CopyOptionValue::IntOption(value) => { - comma_sepated_options.push_str(&format!("{} {}", key, value)); - } - } - - if option_idx < options.len() - 1 { - comma_sepated_options.push_str(", "); - } - } - - comma_sepated_options - } - - struct TestTable { - uri: String, - order_by_col: String, - copy_to_options: HashMap, - copy_from_options: HashMap, - _data: PhantomData, - } - - impl TestTable { - fn new(typename: String) -> Self { - Spi::run("DROP TABLE IF EXISTS test_expected, test_result;").unwrap(); - - let create_table_command = format!("CREATE TABLE test_expected (a {});", &typename); - Spi::run(create_table_command.as_str()).unwrap(); - - let create_table_command = format!("CREATE TABLE test_result (a {});", &typename); - Spi::run(create_table_command.as_str()).unwrap(); - - let mut copy_to_options = HashMap::new(); - copy_to_options.insert( - "format".to_string(), - CopyOptionValue::StringOption("parquet".to_string()), - ); - - let mut copy_from_options = HashMap::new(); - copy_from_options.insert( - "format".to_string(), - CopyOptionValue::StringOption("parquet".to_string()), - ); - - let uri = "/tmp/test.parquet".to_string(); - - let order_by_col = "a".to_string(); - - Self { - uri, - order_by_col, - copy_to_options, - copy_from_options, - _data: PhantomData, - } - } - - fn with_order_by_col(mut self, order_by_col: String) -> Self { - self.order_by_col = order_by_col; - self - } - - fn with_copy_to_options( - mut self, - copy_to_options: HashMap, - ) -> Self { - self.copy_to_options = copy_to_options; - self - } - - fn with_copy_from_options( - mut self, - copy_from_options: HashMap, - ) -> Self { - self.copy_from_options = copy_from_options; - self - } - - fn with_uri(mut self, uri: String) -> Self { - self.uri = uri; - self - } - - fn insert(&self, insert_command: &str) { - Spi::run(insert_command).unwrap(); - } - - fn select_all(&self, table_name: &str) -> Vec<(Option,)> { - let select_command = format!( - "SELECT a FROM {} ORDER BY {};", - table_name, self.order_by_col - ); - - Spi::connect(|client| { - let mut results = Vec::new(); - let tup_table = client.select(&select_command, None, None).unwrap(); - - for row in tup_table { - let val = row["a"].value::(); - results.push((val.expect("could not select"),)); - } - - results - }) - } - - fn copy_to_parquet(&self) { - let mut copy_to_query = format!("COPY (SELECT a FROM test_expected) TO '{}'", self.uri); - - if !self.copy_to_options.is_empty() { - copy_to_query.push_str(" WITH ("); - - let options_str = comma_separated_copy_options(&self.copy_to_options); - copy_to_query.push_str(&options_str); - - copy_to_query.push(')'); - } - - copy_to_query.push(';'); - - Spi::run(copy_to_query.as_str()).unwrap(); - } - - fn copy_from_parquet(&self) { - let mut copy_from_query = format!("COPY test_result FROM '{}'", self.uri); - - if !self.copy_from_options.is_empty() { - copy_from_query.push_str(" WITH ("); - - let options_str = comma_separated_copy_options(&self.copy_from_options); - copy_from_query.push_str(&options_str); - - copy_from_query.push(')'); - } - - copy_from_query.push(';'); - - Spi::run(copy_from_query.as_str()).unwrap(); - } - } - - fn timetz_to_utc_time(timetz: TimeWithTimeZone) -> Option