From 484d2165443e59e670f52eca904e85d0af3efc88 Mon Sep 17 00:00:00 2001 From: Alfonso Subiotto Marques Date: Tue, 28 Nov 2023 11:49:09 +0100 Subject: [PATCH] *: don't call schema.Fields which allocates --- cmd/parquet-tool/cmd/util.go | 3 ++- dynparquet/example.go | 2 +- pqarrow/arrow.go | 7 ++++--- pqarrow/builder/recordbuilder.go | 12 +++++++----- pqarrow/parquet.go | 3 ++- query/physicalplan/aggregate.go | 3 ++- query/physicalplan/distinct.go | 3 ++- query/physicalplan/ordered_aggregate.go | 3 ++- query/physicalplan/ordered_synchronizer.go | 6 ++++-- query/physicalplan/project.go | 16 ++++++++++------ table.go | 4 ++-- 11 files changed, 38 insertions(+), 24 deletions(-) diff --git a/cmd/parquet-tool/cmd/util.go b/cmd/parquet-tool/cmd/util.go index 9e4b6edfa..3d0d09435 100644 --- a/cmd/parquet-tool/cmd/util.go +++ b/cmd/parquet-tool/cmd/util.go @@ -63,7 +63,8 @@ func inspectRecord(record arrow.Record, columns []string) { } else { fields := make([]arrow.Field, 0, len(columns)) cols := make([]arrow.Array, 0, len(columns)) - for i, field := range record.Schema().Fields() { + for i := 0; i < record.Schema().NumFields(); i++ { + field := record.Schema().Field(i) for _, col := range columns { if col == field.Name { fields = append(fields, field) diff --git a/dynparquet/example.go b/dynparquet/example.go index c73903bac..7847565f0 100644 --- a/dynparquet/example.go +++ b/dynparquet/example.go @@ -98,7 +98,7 @@ func (s Samples) ToRecord() (arrow.Record, error) { bld := array.NewRecordBuilder(memory.NewGoAllocator(), schema) defer bld.Release() - numLabels := len(schema.Fields()) - 4 + numLabels := schema.NumFields() - 4 for _, sample := range s { if err := bld.Field(0).(*array.BinaryDictionaryBuilder).Append([]byte(sample.ExampleType)); err != nil { diff --git a/pqarrow/arrow.go b/pqarrow/arrow.go index 036f1ee5b..c60447633 100644 --- a/pqarrow/arrow.go +++ b/pqarrow/arrow.go @@ -240,7 +240,7 @@ func (c *ParquetConverter) Convert(ctx context.Context, rg parquet.RowGroup) err return err } // If the schema has no fields we simply ignore this RowGroup that has no data. - if len(schema.Fields()) == 0 { + if schema.NumFields() == 0 { return nil } @@ -615,7 +615,7 @@ func rowBasedParquetRowGroupToArrowRecord( ) error { parquetFields := rg.Schema().Fields() - if len(schema.Fields()) != len(parquetFields) { + if schema.NumFields() != len(parquetFields) { return fmt.Errorf("inconsistent schema between arrow and parquet") } @@ -1151,7 +1151,8 @@ func mergeArrowSchemas(schemas []*arrow.Schema) *arrow.Schema { fieldsMap := make(map[string]arrow.Field) for _, schema := range schemas { - for _, f := range schema.Fields() { + for i := 0; i < schema.NumFields(); i++ { + f := schema.Field(i) if _, ok := fieldsMap[f.Name]; !ok { fieldNames = append(fieldNames, f.Name) fieldsMap[f.Name] = f diff --git a/pqarrow/builder/recordbuilder.go b/pqarrow/builder/recordbuilder.go index 4cea2a48a..b787b1ac5 100644 --- a/pqarrow/builder/recordbuilder.go +++ b/pqarrow/builder/recordbuilder.go @@ -28,11 +28,11 @@ func NewRecordBuilder(mem memory.Allocator, schema *arrow.Schema) *RecordBuilder refCount: 1, mem: mem, schema: schema, - fields: make([]ColumnBuilder, len(schema.Fields())), + fields: make([]ColumnBuilder, schema.NumFields()), } - for i, f := range schema.Fields() { - b.fields[i] = NewBuilder(mem, f.Type) + for i := 0; i < schema.NumFields(); i++ { + b.fields[i] = NewBuilder(mem, schema.Field(i).Type) } return b @@ -97,9 +97,11 @@ func (b *RecordBuilder) NewRecord() arrow.Record { // ExpandSchema expands the record builder schema by adding new fields. func (b *RecordBuilder) ExpandSchema(schema *arrow.Schema) { - for i, f := range schema.Fields() { + for i := 0; i < schema.NumFields(); i++ { + f := schema.Field(i) found := false - for _, old := range b.schema.Fields() { + for j := 0; j < b.schema.NumFields(); j++ { + old := b.schema.Field(j) if f.Equal(old) { found = true break diff --git a/pqarrow/parquet.go b/pqarrow/parquet.go index f7c406f8b..7e20e2bd8 100644 --- a/pqarrow/parquet.go +++ b/pqarrow/parquet.go @@ -151,7 +151,8 @@ func recordToRows( func RecordDynamicCols(record arrow.Record) map[string][]string { dyncols := map[string][]string{} - for _, af := range record.Schema().Fields() { + for i := 0; i < record.Schema().NumFields(); i++ { + af := record.Schema().Field(i) parts := strings.SplitN(af.Name, ".", 2) if len(parts) == 2 { // dynamic column dyncols[parts[0]] = append(dyncols[parts[0]], parts[1]) diff --git a/query/physicalplan/aggregate.go b/query/physicalplan/aggregate.go index 321f4571d..bd4b248b4 100644 --- a/query/physicalplan/aggregate.go +++ b/query/physicalplan/aggregate.go @@ -328,7 +328,8 @@ func (a *HashAggregate) Callback(ctx context.Context, r arrow.Record) error { concreteAggregateFieldsFound := 0 dynamicAggregateFieldsFound := 0 - for i, field := range r.Schema().Fields() { + for i := 0; i < r.Schema().NumFields(); i++ { + field := r.Schema().Field(i) for _, matcher := range a.groupByColumnMatchers { if matcher.MatchColumn(field.Name) { groupByFields = append(groupByFields, field) diff --git a/query/physicalplan/distinct.go b/query/physicalplan/distinct.go index cd49942e3..fc39bf1f7 100644 --- a/query/physicalplan/distinct.go +++ b/query/physicalplan/distinct.go @@ -76,7 +76,8 @@ func (d *Distinction) Callback(ctx context.Context, r arrow.Record) error { distinctFieldHashes := make([]uint64, 0, 10) distinctArrays := make([]arrow.Array, 0, 10) - for i, field := range r.Schema().Fields() { + for i := 0; i < r.Schema().NumFields(); i++ { + field := r.Schema().Field(i) for _, col := range d.columns { if col.MatchColumn(field.Name) { distinctFields = append(distinctFields, field) diff --git a/query/physicalplan/ordered_aggregate.go b/query/physicalplan/ordered_aggregate.go index 47e30fe15..ad973ec90 100644 --- a/query/physicalplan/ordered_aggregate.go +++ b/query/physicalplan/ordered_aggregate.go @@ -175,7 +175,8 @@ func (a *OrderedAggregate) Callback(_ context.Context, r arrow.Record) error { var columnToAggregate arrow.Array aggregateFieldFound := false foundNewColumns := false - for i, field := range r.Schema().Fields() { + for i := 0; i < r.Schema().NumFields(); i++ { + field := r.Schema().Field(i) for _, matcher := range a.groupByColumnMatchers { if matcher.MatchColumn(field.Name) { a.scratch.groupByMap[field.Name] = groupColInfo{field: field, arr: r.Column(i)} diff --git a/query/physicalplan/ordered_synchronizer.go b/query/physicalplan/ordered_synchronizer.go index 10e1f43ce..ccf7abed4 100644 --- a/query/physicalplan/ordered_synchronizer.go +++ b/query/physicalplan/ordered_synchronizer.go @@ -157,7 +157,8 @@ func (o *OrderedSynchronizer) ensureSameSchema(records []arrow.Record) error { for i, orderCol := range o.orderByExprs { orderCols[i] = make(map[string]arrow.Field) for _, r := range records { - for _, field := range r.Schema().Fields() { + for j := 0; j < r.Schema().NumFields(); j++ { + field := r.Schema().Field(j) if ok := orderCol.MatchColumn(field.Name); ok { orderCols[i][field.Name] = field } else { @@ -213,7 +214,8 @@ func (o *OrderedSynchronizer) ensureSameSchema(records []arrow.Record) error { } var columns []arrow.Array - for _, field := range schema.Fields() { + for j := 0; j < schema.NumFields(); j++ { + field := schema.Field(j) if otherFields := otherSchema.FieldIndices(field.Name); otherFields != nil { if len(otherFields) > 1 { fieldsFound, _ := otherSchema.FieldsByName(field.Name) diff --git a/query/physicalplan/project.go b/query/physicalplan/project.go index 1dbf69307..f86570217 100644 --- a/query/physicalplan/project.go +++ b/query/physicalplan/project.go @@ -45,7 +45,8 @@ func (a aliasProjection) Project(mem memory.Allocator, ar arrow.Record) ([]arrow } return fields, array, nil case *logicalplan.Column: - for i, field := range ar.Schema().Fields() { + for i := 0; i < ar.Schema().NumFields(); i++ { + field := ar.Schema().Field(i) if a.expr.MatchColumn(field.Name) { field.Name = a.name ar.Column(i).Retain() // Retain the column since we're keeping it. @@ -134,7 +135,8 @@ func (p plainProjection) Name() string { } func (p plainProjection) Project(mem memory.Allocator, ar arrow.Record) ([]arrow.Field, []arrow.Array, error) { - for i, field := range ar.Schema().Fields() { + for i := 0; i < ar.Schema().NumFields(); i++ { + field := ar.Schema().Field(i) if p.expr.MatchColumn(field.Name) { ar.Column(i).Retain() // Retain the column since we're keeping it. return []arrow.Field{field}, []arrow.Array{ar.Column(i)}, nil @@ -155,7 +157,8 @@ func (p dynamicProjection) Name() string { func (p dynamicProjection) Project(mem memory.Allocator, ar arrow.Record) ([]arrow.Field, []arrow.Array, error) { fields := []arrow.Field{} arrays := []arrow.Array{} - for i, field := range ar.Schema().Fields() { + for i := 0; i < ar.Schema().NumFields(); i++ { + field := ar.Schema().Field(i) if p.expr.MatchColumn(field.Name) { fields = append(fields, field) arrays = append(arrays, ar.Column(i)) @@ -324,11 +327,12 @@ func (a *averageProjection) Project(mem memory.Allocator, r arrow.Record) ([]arr sums := r.Column(sumIndex[0]) counts := r.Column(countIndex[0]) - fields := make([]arrow.Field, 0, len(schema.Fields())-1) - columns := make([]arrow.Array, 0, len(schema.Fields())-1) + fields := make([]arrow.Field, 0, schema.NumFields()-1) + columns := make([]arrow.Array, 0, schema.NumFields()-1) // Only add the fields and columns that aren't the average's underlying sum and count columns. - for i, field := range schema.Fields() { + for i := 0; i < schema.NumFields(); i++ { + field := schema.Field(i) if i != sumIndex[0] && i != countIndex[0] { fields = append(fields, field) columns = append(columns, r.Column(i)) diff --git a/table.go b/table.go index 234ee2efa..75d5446e3 100644 --- a/table.go +++ b/table.go @@ -797,8 +797,8 @@ func (t *Table) SchemaIterator( switch t := rg.(type) { case arrow.Record: - for _, f := range t.Schema().Fields() { - b.Field(0).(*array.StringBuilder).Append(f.Name) + for i := 0; i < t.Schema().NumFields(); i++ { + b.Field(0).(*array.StringBuilder).Append(t.Schema().Field(i).Name) } record := b.NewRecord() err := callback(ctx, record)