Skip to content

Commit

Permalink
*: don't call schema.Fields which allocates
Browse files Browse the repository at this point in the history
  • Loading branch information
asubiotto committed Nov 28, 2023
1 parent aa82c94 commit 484d216
Show file tree
Hide file tree
Showing 11 changed files with 38 additions and 24 deletions.
3 changes: 2 additions & 1 deletion cmd/parquet-tool/cmd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ func inspectRecord(record arrow.Record, columns []string) {
} else {
fields := make([]arrow.Field, 0, len(columns))
cols := make([]arrow.Array, 0, len(columns))
for i, field := range record.Schema().Fields() {
for i := 0; i < record.Schema().NumFields(); i++ {
field := record.Schema().Field(i)
for _, col := range columns {
if col == field.Name {
fields = append(fields, field)
Expand Down
2 changes: 1 addition & 1 deletion dynparquet/example.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (s Samples) ToRecord() (arrow.Record, error) {
bld := array.NewRecordBuilder(memory.NewGoAllocator(), schema)
defer bld.Release()

numLabels := len(schema.Fields()) - 4
numLabels := schema.NumFields() - 4

for _, sample := range s {
if err := bld.Field(0).(*array.BinaryDictionaryBuilder).Append([]byte(sample.ExampleType)); err != nil {
Expand Down
7 changes: 4 additions & 3 deletions pqarrow/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (c *ParquetConverter) Convert(ctx context.Context, rg parquet.RowGroup) err
return err
}
// If the schema has no fields we simply ignore this RowGroup that has no data.
if len(schema.Fields()) == 0 {
if schema.NumFields() == 0 {
return nil
}

Expand Down Expand Up @@ -615,7 +615,7 @@ func rowBasedParquetRowGroupToArrowRecord(
) error {
parquetFields := rg.Schema().Fields()

if len(schema.Fields()) != len(parquetFields) {
if schema.NumFields() != len(parquetFields) {
return fmt.Errorf("inconsistent schema between arrow and parquet")
}

Expand Down Expand Up @@ -1151,7 +1151,8 @@ func mergeArrowSchemas(schemas []*arrow.Schema) *arrow.Schema {
fieldsMap := make(map[string]arrow.Field)

for _, schema := range schemas {
for _, f := range schema.Fields() {
for i := 0; i < schema.NumFields(); i++ {
f := schema.Field(i)
if _, ok := fieldsMap[f.Name]; !ok {
fieldNames = append(fieldNames, f.Name)
fieldsMap[f.Name] = f
Expand Down
12 changes: 7 additions & 5 deletions pqarrow/builder/recordbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ func NewRecordBuilder(mem memory.Allocator, schema *arrow.Schema) *RecordBuilder
refCount: 1,
mem: mem,
schema: schema,
fields: make([]ColumnBuilder, len(schema.Fields())),
fields: make([]ColumnBuilder, schema.NumFields()),
}

for i, f := range schema.Fields() {
b.fields[i] = NewBuilder(mem, f.Type)
for i := 0; i < schema.NumFields(); i++ {
b.fields[i] = NewBuilder(mem, schema.Field(i).Type)
}

return b
Expand Down Expand Up @@ -97,9 +97,11 @@ func (b *RecordBuilder) NewRecord() arrow.Record {

// ExpandSchema expands the record builder schema by adding new fields.
func (b *RecordBuilder) ExpandSchema(schema *arrow.Schema) {
for i, f := range schema.Fields() {
for i := 0; i < schema.NumFields(); i++ {
f := schema.Field(i)
found := false
for _, old := range b.schema.Fields() {
for j := 0; j < b.schema.NumFields(); j++ {
old := b.schema.Field(j)
if f.Equal(old) {
found = true
break
Expand Down
3 changes: 2 additions & 1 deletion pqarrow/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ func recordToRows(

func RecordDynamicCols(record arrow.Record) map[string][]string {
dyncols := map[string][]string{}
for _, af := range record.Schema().Fields() {
for i := 0; i < record.Schema().NumFields(); i++ {
af := record.Schema().Field(i)
parts := strings.SplitN(af.Name, ".", 2)
if len(parts) == 2 { // dynamic column
dyncols[parts[0]] = append(dyncols[parts[0]], parts[1])
Expand Down
3 changes: 2 additions & 1 deletion query/physicalplan/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,8 @@ func (a *HashAggregate) Callback(ctx context.Context, r arrow.Record) error {
concreteAggregateFieldsFound := 0
dynamicAggregateFieldsFound := 0

for i, field := range r.Schema().Fields() {
for i := 0; i < r.Schema().NumFields(); i++ {
field := r.Schema().Field(i)
for _, matcher := range a.groupByColumnMatchers {
if matcher.MatchColumn(field.Name) {
groupByFields = append(groupByFields, field)
Expand Down
3 changes: 2 additions & 1 deletion query/physicalplan/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ func (d *Distinction) Callback(ctx context.Context, r arrow.Record) error {
distinctFieldHashes := make([]uint64, 0, 10)
distinctArrays := make([]arrow.Array, 0, 10)

for i, field := range r.Schema().Fields() {
for i := 0; i < r.Schema().NumFields(); i++ {
field := r.Schema().Field(i)
for _, col := range d.columns {
if col.MatchColumn(field.Name) {
distinctFields = append(distinctFields, field)
Expand Down
3 changes: 2 additions & 1 deletion query/physicalplan/ordered_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ func (a *OrderedAggregate) Callback(_ context.Context, r arrow.Record) error {
var columnToAggregate arrow.Array
aggregateFieldFound := false
foundNewColumns := false
for i, field := range r.Schema().Fields() {
for i := 0; i < r.Schema().NumFields(); i++ {
field := r.Schema().Field(i)
for _, matcher := range a.groupByColumnMatchers {
if matcher.MatchColumn(field.Name) {
a.scratch.groupByMap[field.Name] = groupColInfo{field: field, arr: r.Column(i)}
Expand Down
6 changes: 4 additions & 2 deletions query/physicalplan/ordered_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ func (o *OrderedSynchronizer) ensureSameSchema(records []arrow.Record) error {
for i, orderCol := range o.orderByExprs {
orderCols[i] = make(map[string]arrow.Field)
for _, r := range records {
for _, field := range r.Schema().Fields() {
for j := 0; j < r.Schema().NumFields(); j++ {
field := r.Schema().Field(j)
if ok := orderCol.MatchColumn(field.Name); ok {
orderCols[i][field.Name] = field
} else {
Expand Down Expand Up @@ -213,7 +214,8 @@ func (o *OrderedSynchronizer) ensureSameSchema(records []arrow.Record) error {
}

var columns []arrow.Array
for _, field := range schema.Fields() {
for j := 0; j < schema.NumFields(); j++ {
field := schema.Field(j)
if otherFields := otherSchema.FieldIndices(field.Name); otherFields != nil {
if len(otherFields) > 1 {
fieldsFound, _ := otherSchema.FieldsByName(field.Name)
Expand Down
16 changes: 10 additions & 6 deletions query/physicalplan/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ func (a aliasProjection) Project(mem memory.Allocator, ar arrow.Record) ([]arrow
}
return fields, array, nil
case *logicalplan.Column:
for i, field := range ar.Schema().Fields() {
for i := 0; i < ar.Schema().NumFields(); i++ {
field := ar.Schema().Field(i)
if a.expr.MatchColumn(field.Name) {
field.Name = a.name
ar.Column(i).Retain() // Retain the column since we're keeping it.
Expand Down Expand Up @@ -134,7 +135,8 @@ func (p plainProjection) Name() string {
}

func (p plainProjection) Project(mem memory.Allocator, ar arrow.Record) ([]arrow.Field, []arrow.Array, error) {
for i, field := range ar.Schema().Fields() {
for i := 0; i < ar.Schema().NumFields(); i++ {
field := ar.Schema().Field(i)
if p.expr.MatchColumn(field.Name) {
ar.Column(i).Retain() // Retain the column since we're keeping it.
return []arrow.Field{field}, []arrow.Array{ar.Column(i)}, nil
Expand All @@ -155,7 +157,8 @@ func (p dynamicProjection) Name() string {
func (p dynamicProjection) Project(mem memory.Allocator, ar arrow.Record) ([]arrow.Field, []arrow.Array, error) {
fields := []arrow.Field{}
arrays := []arrow.Array{}
for i, field := range ar.Schema().Fields() {
for i := 0; i < ar.Schema().NumFields(); i++ {
field := ar.Schema().Field(i)
if p.expr.MatchColumn(field.Name) {
fields = append(fields, field)
arrays = append(arrays, ar.Column(i))
Expand Down Expand Up @@ -324,11 +327,12 @@ func (a *averageProjection) Project(mem memory.Allocator, r arrow.Record) ([]arr
sums := r.Column(sumIndex[0])
counts := r.Column(countIndex[0])

fields := make([]arrow.Field, 0, len(schema.Fields())-1)
columns := make([]arrow.Array, 0, len(schema.Fields())-1)
fields := make([]arrow.Field, 0, schema.NumFields()-1)
columns := make([]arrow.Array, 0, schema.NumFields()-1)

// Only add the fields and columns that aren't the average's underlying sum and count columns.
for i, field := range schema.Fields() {
for i := 0; i < schema.NumFields(); i++ {
field := schema.Field(i)
if i != sumIndex[0] && i != countIndex[0] {
fields = append(fields, field)
columns = append(columns, r.Column(i))
Expand Down
4 changes: 2 additions & 2 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,8 +797,8 @@ func (t *Table) SchemaIterator(

switch t := rg.(type) {
case arrow.Record:
for _, f := range t.Schema().Fields() {
b.Field(0).(*array.StringBuilder).Append(f.Name)
for i := 0; i < t.Schema().NumFields(); i++ {
b.Field(0).(*array.StringBuilder).Append(t.Schema().Field(i).Name)
}
record := b.NewRecord()
err := callback(ctx, record)
Expand Down

0 comments on commit 484d216

Please sign in to comment.