diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index f859ec9653f78..40d19d38e10ab 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -40,6 +40,7 @@ #include "arrow/util/crc32.h" #include "arrow/util/endian.h" #include "arrow/util/float16.h" +#include "arrow/util/key_value_metadata.h" #include "arrow/util/logging.h" #include "arrow/util/rle_encoding_internal.h" #include "arrow/util/type_traits.h" @@ -832,6 +833,9 @@ class ColumnWriterImpl { void FlushBufferedDataPages(); ColumnChunkMetaDataBuilder* metadata_; + // key_value_metadata_ for the column chunk + // It would be nullptr if there is no KeyValueMetadata set. + std::shared_ptr key_value_metadata_; const ColumnDescriptor* descr_; // scratch buffer if validity bits need to be recalculated. std::shared_ptr bits_buffer_; @@ -1100,6 +1104,7 @@ int64_t ColumnWriterImpl::Close() { if (rows_written_ > 0 && chunk_statistics.is_set()) { metadata_->SetStatistics(chunk_statistics); } + metadata_->SetKeyValueMetadata(key_value_metadata_); pager_->Close(has_dictionary_, fallback_); } @@ -1397,6 +1402,25 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< return pages_change_on_record_boundaries_; } + void AddKeyValueMetadata( + const std::shared_ptr& key_value_metadata) override { + if (closed_) { + throw ParquetException("Cannot add key-value metadata to closed column"); + } + if (key_value_metadata_ == nullptr) { + key_value_metadata_ = key_value_metadata; + } else if (key_value_metadata != nullptr) { + key_value_metadata_ = key_value_metadata_->Merge(*key_value_metadata); + } + } + + void ResetKeyValueMetadata() override { + if (closed_) { + throw ParquetException("Cannot add key-value metadata to closed column"); + } + key_value_metadata_ = nullptr; + } + private: using ValueEncoderType = typename EncodingTraits::Encoder; using TypedStats = TypedStatistics; diff --git a/cpp/src/parquet/column_writer.h b/cpp/src/parquet/column_writer.h index a278670fa81c6..845bf9aa896bd 100644 --- a/cpp/src/parquet/column_writer.h +++ b/cpp/src/parquet/column_writer.h @@ -21,6 +21,7 @@ #include #include +#include "arrow/type_fwd.h" #include "arrow/util/compression.h" #include "parquet/exception.h" #include "parquet/platform.h" @@ -181,6 +182,17 @@ class PARQUET_EXPORT ColumnWriter { /// \brief The file-level writer properties virtual const WriterProperties* properties() = 0; + /// \brief Add key-value metadata to the ColumnChunk. + /// \param[in] key_value_metadata the metadata to add. + /// \note This will overwrite any existing metadata with the same key. + /// \throw ParquetException if Close() has been called. + virtual void AddKeyValueMetadata( + const std::shared_ptr& key_value_metadata) = 0; + + /// \brief Reset the ColumnChunk key-value metadata. + /// \throw ParquetException if Close() has been called. + virtual void ResetKeyValueMetadata() = 0; + /// \brief Write Apache Arrow columnar data directly to ColumnWriter. Returns /// error status if the array data type is not compatible with the concrete /// writer type. diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc index c99efd17961aa..d2b3aa0dff003 100644 --- a/cpp/src/parquet/column_writer_test.cc +++ b/cpp/src/parquet/column_writer_test.cc @@ -23,10 +23,12 @@ #include #include "arrow/io/buffered.h" +#include "arrow/io/file.h" #include "arrow/testing/gtest_util.h" #include "arrow/util/bit_util.h" #include "arrow/util/bitmap_builders.h" #include "arrow/util/config.h" +#include "arrow/util/key_value_metadata.h" #include "parquet/column_page.h" #include "parquet/column_reader.h" @@ -51,6 +53,9 @@ using schema::PrimitiveNode; namespace test { +using ::testing::IsNull; +using ::testing::NotNull; + // The default size used in most tests. const int SMALL_SIZE = 100; #ifdef PARQUET_VALGRIND @@ -385,6 +390,15 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { return metadata_accessor->encoding_stats(); } + std::shared_ptr metadata_key_value_metadata() { + // Metadata accessor must be created lazily. + // This is because the ColumnChunkMetaData semantics dictate the metadata object is + // complete (no changes to the metadata buffer can be made after instantiation) + auto metadata_accessor = + ColumnChunkMetaData::Make(metadata_->contents(), this->descr_); + return metadata_accessor->key_value_metadata(); + } + protected: int64_t values_read_; // Keep the reader alive as for ByteArray the lifetime of the ByteArray @@ -1705,5 +1719,60 @@ TEST(TestColumnWriter, WriteDataPageV2HeaderNullCount) { } } +using TestInt32Writer = TestPrimitiveWriter; + +TEST_F(TestInt32Writer, NoWriteKeyValueMetadata) { + auto writer = this->BuildWriter(); + writer->Close(); + auto key_value_metadata = metadata_key_value_metadata(); + ASSERT_THAT(key_value_metadata, IsNull()); +} + +TEST_F(TestInt32Writer, WriteKeyValueMetadata) { + auto writer = this->BuildWriter(); + writer->AddKeyValueMetadata( + KeyValueMetadata::Make({"hello", "bye"}, {"world", "earth"})); + // overwrite the previous value + writer->AddKeyValueMetadata(KeyValueMetadata::Make({"bye"}, {"moon"})); + writer->Close(); + auto key_value_metadata = metadata_key_value_metadata(); + ASSERT_THAT(key_value_metadata, NotNull()); + ASSERT_EQ(2, key_value_metadata->size()); + ASSERT_OK_AND_ASSIGN(auto value, key_value_metadata->Get("hello")); + ASSERT_EQ("world", value); + ASSERT_OK_AND_ASSIGN(value, key_value_metadata->Get("bye")); + ASSERT_EQ("moon", value); +} + +TEST_F(TestInt32Writer, ResetKeyValueMetadata) { + auto writer = this->BuildWriter(); + writer->AddKeyValueMetadata(KeyValueMetadata::Make({"hello"}, {"world"})); + writer->ResetKeyValueMetadata(); + writer->Close(); + auto key_value_metadata = metadata_key_value_metadata(); + ASSERT_THAT(key_value_metadata, IsNull()); +} + +TEST_F(TestInt32Writer, WriteKeyValueMetadataEndToEnd) { + auto sink = CreateOutputStream(); + { + auto file_writer = ParquetFileWriter::Open( + sink, std::dynamic_pointer_cast(schema_.schema_root())); + auto rg_writer = file_writer->AppendRowGroup(); + auto col_writer = rg_writer->NextColumn(); + col_writer->AddKeyValueMetadata(KeyValueMetadata::Make({"foo"}, {"bar"})); + file_writer->Close(); + } + ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish()); + auto file_reader = + ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer)); + auto key_value_metadata = + file_reader->metadata()->RowGroup(0)->ColumnChunk(0)->key_value_metadata(); + ASSERT_THAT(key_value_metadata, NotNull()); + ASSERT_EQ(1U, key_value_metadata->size()); + ASSERT_OK_AND_ASSIGN(auto value, key_value_metadata->Get("foo")); + ASSERT_EQ("bar", value); +} + } // namespace test } // namespace parquet diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc index 10c8afaf37507..4f2aa6e37328c 100644 --- a/cpp/src/parquet/metadata.cc +++ b/cpp/src/parquet/metadata.cc @@ -135,6 +135,39 @@ std::shared_ptr MakeColumnStats(const format::ColumnMetaData& meta_d throw ParquetException("Can't decode page statistics for selected column type"); } +// Get KeyValueMetadata from parquet Thrift RowGroup or ColumnChunk metadata. +// +// Returns nullptr if the metadata is not set. +template +std::shared_ptr FromThriftKeyValueMetadata(const Metadata& source) { + std::shared_ptr metadata = nullptr; + if (source.__isset.key_value_metadata) { + std::vector keys; + std::vector values; + keys.reserve(source.key_value_metadata.size()); + values.reserve(source.key_value_metadata.size()); + for (const auto& it : source.key_value_metadata) { + keys.push_back(it.key); + values.push_back(it.value); + } + metadata = std::make_shared(std::move(keys), std::move(values)); + } + return metadata; +} + +template +void ToThriftKeyValueMetadata(const KeyValueMetadata& source, Metadata* metadata) { + std::vector key_value_metadata; + key_value_metadata.reserve(static_cast(source.size())); + for (int64_t i = 0; i < source.size(); ++i) { + format::KeyValue kv_pair; + kv_pair.__set_key(source.key(i)); + kv_pair.__set_value(source.value(i)); + key_value_metadata.emplace_back(std::move(kv_pair)); + } + metadata->__set_key_value_metadata(std::move(key_value_metadata)); +} + // MetaData Accessor // ColumnCryptoMetaData @@ -233,6 +266,7 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl { encoding_stats.count}); } possible_stats_ = nullptr; + InitKeyValueMetadata(); } bool Equals(const ColumnChunkMetaDataImpl& other) const { @@ -343,7 +377,15 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl { return std::nullopt; } + const std::shared_ptr& key_value_metadata() const { + return key_value_metadata_; + } + private: + void InitKeyValueMetadata() { + key_value_metadata_ = FromThriftKeyValueMetadata(*column_metadata_); + } + mutable std::shared_ptr possible_stats_; std::vector encodings_; std::vector encoding_stats_; @@ -353,6 +395,7 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl { const ColumnDescriptor* descr_; const ReaderProperties properties_; const ApplicationVersion* writer_version_; + std::shared_ptr key_value_metadata_; }; std::unique_ptr ColumnChunkMetaData::Make( @@ -471,6 +514,11 @@ bool ColumnChunkMetaData::Equals(const ColumnChunkMetaData& other) const { return impl_->Equals(*other.impl_); } +const std::shared_ptr& ColumnChunkMetaData::key_value_metadata() + const { + return impl_->key_value_metadata(); +} + // row-group metadata class RowGroupMetaData::RowGroupMetaDataImpl { public: @@ -913,7 +961,7 @@ class FileMetaData::FileMetaDataImpl { std::vector column_orders; if (metadata_->__isset.column_orders) { column_orders.reserve(metadata_->column_orders.size()); - for (auto column_order : metadata_->column_orders) { + for (auto& column_order : metadata_->column_orders) { if (column_order.__isset.TYPE_ORDER) { column_orders.push_back(ColumnOrder::type_defined_); } else { @@ -928,14 +976,7 @@ class FileMetaData::FileMetaDataImpl { } void InitKeyValueMetadata() { - std::shared_ptr metadata = nullptr; - if (metadata_->__isset.key_value_metadata) { - metadata = std::make_shared(); - for (const auto& it : metadata_->key_value_metadata) { - metadata->Append(it.key, it.value); - } - } - key_value_metadata_ = std::move(metadata); + key_value_metadata_ = FromThriftKeyValueMetadata(*metadata_); } }; @@ -1590,6 +1631,10 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { column_chunk_->meta_data.__set_encodings(std::move(thrift_encodings)); column_chunk_->meta_data.__set_encoding_stats(std::move(thrift_encoding_stats)); + if (key_value_metadata_) { + ToThriftKeyValueMetadata(*key_value_metadata_, &column_chunk_->meta_data); + } + const auto& encrypt_md = properties_->column_encryption_properties(column_->path()->ToDotString()); // column is encrypted @@ -1656,6 +1701,10 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { return column_chunk_->meta_data.total_compressed_size; } + void SetKeyValueMetadata(std::shared_ptr key_value_metadata) { + key_value_metadata_ = std::move(key_value_metadata); + } + private: void Init(format::ColumnChunk* column_chunk) { column_chunk_ = column_chunk; @@ -1670,6 +1719,7 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { std::unique_ptr owned_column_chunk_; const std::shared_ptr properties_; const ColumnDescriptor* column_; + std::shared_ptr key_value_metadata_; }; std::unique_ptr ColumnChunkMetaDataBuilder::Make( @@ -1727,6 +1777,11 @@ void ColumnChunkMetaDataBuilder::SetStatistics(const EncodedStatistics& result) impl_->SetStatistics(result); } +void ColumnChunkMetaDataBuilder::SetKeyValueMetadata( + std::shared_ptr key_value_metadata) { + impl_->SetKeyValueMetadata(std::move(key_value_metadata)); +} + int64_t ColumnChunkMetaDataBuilder::total_compressed_size() const { return impl_->total_compressed_size(); } @@ -1925,16 +1980,7 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl { } else if (key_value_metadata) { key_value_metadata_ = key_value_metadata_->Merge(*key_value_metadata); } - metadata_->key_value_metadata.clear(); - metadata_->key_value_metadata.reserve( - static_cast(key_value_metadata_->size())); - for (int64_t i = 0; i < key_value_metadata_->size(); ++i) { - format::KeyValue kv_pair; - kv_pair.__set_key(key_value_metadata_->key(i)); - kv_pair.__set_value(key_value_metadata_->value(i)); - metadata_->key_value_metadata.push_back(std::move(kv_pair)); - } - metadata_->__isset.key_value_metadata = true; + ToThriftKeyValueMetadata(*key_value_metadata_, metadata_.get()); } int32_t file_version = 0; diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h index e46297540ba6e..d1e2d1904a694 100644 --- a/cpp/src/parquet/metadata.h +++ b/cpp/src/parquet/metadata.h @@ -184,6 +184,7 @@ class PARQUET_EXPORT ColumnChunkMetaData { std::unique_ptr crypto_metadata() const; std::optional GetColumnIndexLocation() const; std::optional GetOffsetIndexLocation() const; + const std::shared_ptr& key_value_metadata() const; private: explicit ColumnChunkMetaData( @@ -466,8 +467,12 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder { // column chunk // Used when a dataset is spread across multiple files void set_file_path(const std::string& path); + // column metadata void SetStatistics(const EncodedStatistics& stats); + + void SetKeyValueMetadata(std::shared_ptr key_value_metadata); + // get the column descriptor const ColumnDescriptor* descr() const; diff --git a/cpp/src/parquet/printer.cc b/cpp/src/parquet/printer.cc index 33df5925a1cf1..60adfc697f95c 100644 --- a/cpp/src/parquet/printer.cc +++ b/cpp/src/parquet/printer.cc @@ -64,6 +64,25 @@ void PrintPageEncodingStats(std::ostream& stream, // the fixed initial size is just for an example #define COL_WIDTH 30 +void PutChars(std::ostream& stream, char c, int n) { + for (int i = 0; i < n; ++i) { + stream.put(c); + } +} + +void PrintKeyValueMetadata(std::ostream& stream, + const KeyValueMetadata& key_value_metadata, + int indent_level = 0, int indent_width = 1) { + const int64_t size_of_key_value_metadata = key_value_metadata.size(); + PutChars(stream, ' ', indent_level * indent_width); + stream << "Key Value Metadata: " << size_of_key_value_metadata << " entries\n"; + for (int64_t i = 0; i < size_of_key_value_metadata; i++) { + PutChars(stream, ' ', (indent_level + 1) * indent_width); + stream << "Key nr " << i << " " << key_value_metadata.key(i) << ": " + << key_value_metadata.value(i) << "\n"; + } +} + void ParquetFilePrinter::DebugPrint(std::ostream& stream, std::list selected_columns, bool print_values, bool format_dump, bool print_key_value_metadata, const char* filename) { @@ -76,12 +95,7 @@ void ParquetFilePrinter::DebugPrint(std::ostream& stream, std::list selecte if (print_key_value_metadata && file_metadata->key_value_metadata()) { auto key_value_metadata = file_metadata->key_value_metadata(); - int64_t size_of_key_value_metadata = key_value_metadata->size(); - stream << "Key Value File Metadata: " << size_of_key_value_metadata << " entries\n"; - for (int64_t i = 0; i < size_of_key_value_metadata; i++) { - stream << " Key nr " << i << " " << key_value_metadata->key(i) << ": " - << key_value_metadata->value(i) << "\n"; - } + PrintKeyValueMetadata(stream, *key_value_metadata); } stream << "Number of RowGroups: " << file_metadata->num_row_groups() << "\n"; @@ -136,7 +150,11 @@ void ParquetFilePrinter::DebugPrint(std::ostream& stream, std::list selecte std::shared_ptr stats = column_chunk->statistics(); const ColumnDescriptor* descr = file_metadata->schema()->Column(i); - stream << "Column " << i << std::endl << " Values: " << column_chunk->num_values(); + stream << "Column " << i << std::endl; + if (print_key_value_metadata && column_chunk->key_value_metadata()) { + PrintKeyValueMetadata(stream, *column_chunk->key_value_metadata(), 1, 2); + } + stream << " Values: " << column_chunk->num_values(); if (column_chunk->is_stats_set()) { std::string min = stats->EncodeMin(), max = stats->EncodeMax(); stream << ", Null Values: " << stats->null_count() diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index 35d15227ee5dc..d6aebd8284f4a 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -328,6 +328,7 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil: unique_ptr[CColumnCryptoMetaData] crypto_metadata() const optional[ParquetIndexLocation] GetColumnIndexLocation() const optional[ParquetIndexLocation] GetOffsetIndexLocation() const + shared_ptr[const CKeyValueMetadata] key_value_metadata() const struct CSortingColumn" parquet::SortingColumn": int column_idx diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 41b15b633d3d2..254bfe3b09a9c 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -508,6 +508,19 @@ cdef class ColumnChunkMetaData(_Weakrefable): """Whether the column chunk has a column index""" return self.metadata.GetColumnIndexLocation().has_value() + @property + def metadata(self): + """Additional metadata as key value pairs (dict[bytes, bytes]).""" + cdef: + unordered_map[c_string, c_string] metadata + const CKeyValueMetadata* underlying_metadata + underlying_metadata = self.metadata.key_value_metadata().get() + if underlying_metadata != NULL: + underlying_metadata.ToUnorderedMap(&metadata) + return metadata + else: + return None + cdef class SortingColumn: """ diff --git a/python/pyarrow/tests/parquet/conftest.py b/python/pyarrow/tests/parquet/conftest.py index 767e7f6b69d07..80605e973cda8 100644 --- a/python/pyarrow/tests/parquet/conftest.py +++ b/python/pyarrow/tests/parquet/conftest.py @@ -15,6 +15,9 @@ # specific language governing permissions and limitations # under the License. +import os +import pathlib + import pytest from pyarrow.util import guid @@ -25,6 +28,15 @@ def datadir(base_datadir): return base_datadir / 'parquet' +@pytest.fixture(scope='module') +def parquet_test_datadir(): + result = os.environ.get('PARQUET_TEST_DATA') + if not result: + raise RuntimeError('Please point the PARQUET_TEST_DATA environment ' + 'variable to the test data directory') + return pathlib.Path(result) + + @pytest.fixture def s3_bucket(s3_server): boto3 = pytest.importorskip('boto3') diff --git a/python/pyarrow/tests/parquet/test_metadata.py b/python/pyarrow/tests/parquet/test_metadata.py index 528cf0110dd95..c29213ebc3d42 100644 --- a/python/pyarrow/tests/parquet/test_metadata.py +++ b/python/pyarrow/tests/parquet/test_metadata.py @@ -782,3 +782,12 @@ def test_write_metadata_fs_file_combinations(tempdir, s3_example_s3fs): assert meta1.read_bytes() == meta2.read_bytes() \ == meta3.read_bytes() == meta4.read_bytes() \ == s3_fs.open(meta5).read() + + +def test_column_chunk_key_value_metadata(parquet_test_datadir): + metadata = pq.read_metadata(parquet_test_datadir / + 'column_chunk_key_value_metadata.parquet') + key_value_metadata1 = metadata.row_group(0).column(0).metadata + assert key_value_metadata1 == {b'foo': b'bar', b'thisiskeywithoutvalue': b''} + key_value_metadata2 = metadata.row_group(0).column(1).metadata + assert key_value_metadata2 is None