diff --git a/table.go b/table.go index 2202e1517..234ee2efa 100644 --- a/table.go +++ b/table.go @@ -939,12 +939,8 @@ type parquetRowWriter struct { type parquetRowWriterOption func(p *parquetRowWriter) // rowWriter returns a new Parquet row writer with the given dynamic columns. -func (t *TableBlock) rowWriter(writer io.Writer, dynCols map[string][]string, options ...parquetRowWriterOption) (*parquetRowWriter, error) { - w, err := t.table.schema.NewWriter(writer, dynCols, false) - if err != nil { - return nil, err - } - +// TODO(asubiotto): Can we delete this parquetRowWriter? +func (t *TableBlock) rowWriter(w ParquetWriter, options ...parquetRowWriterOption) (*parquetRowWriter, error) { buffSize := 256 if t.table.config.RowGroupSize > 0 { buffSize = int(t.table.config.RowGroupSize) @@ -1199,7 +1195,12 @@ func (t *Table) compactParts(w io.Writer, compact []*parts.Part) (int64, error) return 0, err } err = func() error { - p, err := t.active.rowWriter(w, merged.DynamicColumns()) + pw, err := t.schema.GetWriter(w, merged.DynamicColumns(), false) + if err != nil { + return err + } + defer t.schema.PutWriter(pw) + p, err := t.active.rowWriter(pw) if err != nil { return err } diff --git a/table_test.go b/table_test.go index a2895f333..dc1e9c5fe 100644 --- a/table_test.go +++ b/table_test.go @@ -739,9 +739,12 @@ func Test_RowWriter(t *testing.T) { defer c.Close() b := &bytes.Buffer{} - rowWriter, err := table.ActiveBlock().rowWriter(b, map[string][]string{ + pw, err := table.schema.GetWriter(b, map[string][]string{ "labels": {"node"}, - }) + }, false) + defer table.schema.PutWriter(pw) + require.NoError(t, err) + rowWriter, err := table.ActiveBlock().rowWriter(pw) require.NoError(t, err) // Write 17(8,9) rows, expect 3 row groups of 5 rows and 1 row group of 2 rows