From afd01eb9b06c8f00b1b2b499554d0ffc179e86c9 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Wed, 15 Jan 2025 15:57:19 +0800 Subject: [PATCH 1/6] save --- proto/data.proto | 2 + src/common/src/array/arrow/arrow_impl.rs | 50 ++ src/common/src/array/chrono_array.rs | 4 +- src/common/src/array/mod.rs | 3 +- src/common/src/array/primitive_array.rs | 1 + src/common/src/array/proto_reader.rs | 1 + src/common/src/hash/key.rs | 22 +- src/common/src/test_utils/rand_array.rs | 9 +- src/common/src/test_utils/rand_chunk.rs | 7 +- src/common/src/types/datetime.rs | 770 ++++++++++++----------- src/common/src/types/macros.rs | 1 + src/common/src/types/mod.rs | 27 +- src/common/src/types/postgres_type.rs | 2 + src/common/src/types/scalar_impl.rs | 22 + src/common/src/types/to_sql.rs | 4 +- src/common/src/util/memcmp_encoding.rs | 4 +- 16 files changed, 537 insertions(+), 392 deletions(-) diff --git a/proto/data.proto b/proto/data.proto index 9bb15ebcc8d62..1362be3a05eb7 100644 --- a/proto/data.proto +++ b/proto/data.proto @@ -53,6 +53,7 @@ message DataType { SERIAL = 19; INT256 = 20; MAP = 21; + TIMESTAMP_NANO = 22; } TypeName type_name = 1; // Data length for char. @@ -104,6 +105,7 @@ enum ArrayType { SERIAL = 17; INT256 = 18; MAP = 20; + TIMESTAMP_NANO = 21; } message Array { diff --git a/src/common/src/array/arrow/arrow_impl.rs b/src/common/src/array/arrow/arrow_impl.rs index edf83f0a6f54c..a72b6bff2ab73 100644 --- a/src/common/src/array/arrow/arrow_impl.rs +++ b/src/common/src/array/arrow/arrow_impl.rs @@ -107,6 +107,7 @@ pub trait ToArrow { ArrayImpl::Date(array) => self.date_to_arrow(array), ArrayImpl::Time(array) => self.time_to_arrow(array), ArrayImpl::Timestamp(array) => self.timestamp_to_arrow(array), + ArrayImpl::TimestampNano(array) => self.timestampnano_to_arrow(array), ArrayImpl::Timestamptz(array) => self.timestamptz_to_arrow(array), ArrayImpl::Interval(array) => self.interval_to_arrow(array), ArrayImpl::Utf8(array) => self.utf8_to_arrow(array), @@ -180,6 +181,16 @@ pub trait ToArrow { ))) } + #[inline] + fn timestampnano_to_arrow( + &self, + array: &TimestampNanoArray, + ) -> Result { + Ok(Arc::new(arrow_array::TimestampNanosecondArray::from( + array, + ))) + } + #[inline] fn timestamptz_to_arrow( &self, @@ -319,6 +330,7 @@ pub trait ToArrow { DataType::Date => self.date_type_to_arrow(), DataType::Time => self.time_type_to_arrow(), DataType::Timestamp => self.timestamp_type_to_arrow(), + DataType::TimestampNano => self.timestampnano_type_to_arrow(), DataType::Timestamptz => self.timestamptz_type_to_arrow(), DataType::Interval => self.interval_type_to_arrow(), DataType::Varchar => self.varchar_type_to_arrow(), @@ -382,6 +394,10 @@ pub trait ToArrow { fn timestamp_type_to_arrow(&self) -> arrow_schema::DataType { arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None) } + #[inline] + fn timestampnano_type_to_arrow(&self) -> arrow_schema::DataType { + arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None) + } #[inline] fn timestamptz_type_to_arrow(&self) -> arrow_schema::DataType { @@ -1046,6 +1062,11 @@ converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMillisecondArray converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map); converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map); +converts_with_timeunit!(TimestampNanoArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map); +converts_with_timeunit!(TimestampNanoArray, arrow_array::TimestampMillisecondArray, TimeUnit::Millisecond, @map); +converts_with_timeunit!(TimestampNanoArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map); +converts_with_timeunit!(TimestampNanoArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map); + /// Converts RisingWave value from and into Arrow value. trait FromIntoArrow { /// The corresponding element type in the Arrow array. @@ -1162,6 +1183,35 @@ impl FromIntoArrowWithUnit for Timestamp { } } +impl FromIntoArrowWithUnit for TimestampNano { + type ArrowType = i64; + type TimestampType = TimeUnit; + + fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self { + match time_unit { + TimeUnit::Second => { + TimestampNano(DateTime::from_timestamp(value as _, 0).unwrap().naive_utc()) + } + TimeUnit::Millisecond => { + TimestampNano(DateTime::from_timestamp_millis(value).unwrap().naive_utc()) + } + TimeUnit::Microsecond => { + TimestampNano(DateTime::from_timestamp_micros(value).unwrap().naive_utc()) + } + TimeUnit::Nanosecond => TimestampNano(DateTime::from_timestamp_nanos(value).naive_utc()), + } + } + + fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType { + match time_unit { + TimeUnit::Second => self.0.and_utc().timestamp(), + TimeUnit::Millisecond => self.0.and_utc().timestamp_millis(), + TimeUnit::Microsecond => self.0.and_utc().timestamp_micros(), + TimeUnit::Nanosecond => self.0.and_utc().timestamp_nanos_opt().unwrap(), + } + } +} + impl FromIntoArrowWithUnit for Timestamptz { type ArrowType = i64; type TimestampType = TimeUnit; diff --git a/src/common/src/array/chrono_array.rs b/src/common/src/array/chrono_array.rs index 0e6c9f523d9f7..9c0753f4c180b 100644 --- a/src/common/src/array/chrono_array.rs +++ b/src/common/src/array/chrono_array.rs @@ -12,17 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use super::{PrimitiveArray, PrimitiveArrayBuilder}; +use super::{PrimitiveArray, PrimitiveArrayBuilder, TimestampNano}; use crate::types::{Date, Time, Timestamp, Timestamptz}; pub type DateArray = PrimitiveArray; pub type TimeArray = PrimitiveArray