Skip to content

Commit

Permalink
run formatter
Browse files Browse the repository at this point in the history
  • Loading branch information
sahil1105 committed Aug 12, 2024
1 parent 67b1468 commit 226d2d1
Showing 1 changed file with 34 additions and 32 deletions.
66 changes: 34 additions & 32 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -570,40 +570,42 @@ struct CastingGenerator {
exec_ctx(std::make_shared<compute::ExecContext>(pool)) {}

Future<std::shared_ptr<RecordBatch>> operator()() {
return this->source_().Then(
[this](const std::shared_ptr<RecordBatch>& next) -> Result<std::shared_ptr<RecordBatch>> {
if (IsIterationEnd(next) || this->final_schema_.get() == nullptr) {
return next;
}
std::vector<std::shared_ptr<::arrow::Array>> out_cols;
std::vector<std::shared_ptr<arrow::Field>> out_schema_fields;

bool changed = false;
for (const auto& field : this->final_schema_->fields()) {
FieldRef field_ref = FieldRef(field->name());
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> column, field_ref.GetOneOrNone(*next));
if (column) {
if (!column->type()->Equals(field->type())) {
// Referenced field was present but didn't have the expected type.
ARROW_ASSIGN_OR_RAISE(auto converted, compute::Cast(column, field->type(), compute::CastOptions::Safe(),
this->exec_ctx.get()));
column = converted.make_array();
changed = true;
}
out_cols.emplace_back(std::move(column));
out_schema_fields.emplace_back(field->Copy());
}
return this->source_().Then([this](const std::shared_ptr<RecordBatch>& next)
-> Result<std::shared_ptr<RecordBatch>> {
if (IsIterationEnd(next) || this->final_schema_.get() == nullptr) {
return next;
}
std::vector<std::shared_ptr<::arrow::Array>> out_cols;
std::vector<std::shared_ptr<arrow::Field>> out_schema_fields;

bool changed = false;
for (const auto& field : this->final_schema_->fields()) {
FieldRef field_ref = FieldRef(field->name());
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> column,
field_ref.GetOneOrNone(*next));
if (column) {
if (!column->type()->Equals(field->type())) {
// Referenced field was present but didn't have the expected type.
ARROW_ASSIGN_OR_RAISE(
auto converted,
compute::Cast(column, field->type(), compute::CastOptions::Safe(),
this->exec_ctx.get()));
column = converted.make_array();
changed = true;
}
out_cols.emplace_back(std::move(column));
out_schema_fields.emplace_back(field->Copy());
}
}

if (changed) {
return RecordBatch::Make(
std::make_shared<Schema>(std::move(out_schema_fields),
next->schema()->metadata()),
next->num_rows(), std::move(out_cols));
} else {
return next;
}
});
if (changed) {
return RecordBatch::Make(std::make_shared<Schema>(std::move(out_schema_fields),
next->schema()->metadata()),
next->num_rows(), std::move(out_cols));
} else {
return next;
}
});
}

RecordBatchGenerator source_;
Expand Down

0 comments on commit 226d2d1

Please sign in to comment.