Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DPL Analysis: Preliminary changes for Table rewrite #13679

Merged
merged 11 commits into from
Nov 19, 2024
348 changes: 166 additions & 182 deletions Framework/Core/include/Framework/ASoA.h

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions Framework/Core/include/Framework/ASoAHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ std::vector<BinningIndex> groupTable(const T& table, const BP<Cs...>& binningPol
return groupedIndices;
}

if constexpr (soa::is_soa_filtered_v<T>) {
if constexpr (soa::is_filtered_table<T>) {
selectedRows = table.getSelectedRows(); // vector<int64_t>
}

Expand All @@ -111,7 +111,7 @@ std::vector<BinningIndex> groupTable(const T& table, const BP<Cs...>& binningPol
}
});

if constexpr (soa::is_soa_filtered_v<T>) {
if constexpr (soa::is_filtered_table<T>) {
if (selectedRows[ind] >= selInd + chunkLength) {
selInd += chunkLength;
continue; // Go to the next chunk, no value selected in this chunk
Expand All @@ -120,7 +120,7 @@ std::vector<BinningIndex> groupTable(const T& table, const BP<Cs...>& binningPol

uint64_t ai = 0;
while (ai < chunkLength) {
if constexpr (soa::is_soa_filtered_v<T>) {
if constexpr (soa::is_filtered_table<T>) {
ai += selectedRows[ind] - selInd;
selInd = selectedRows[ind];
}
Expand All @@ -132,7 +132,7 @@ std::vector<BinningIndex> groupTable(const T& table, const BP<Cs...>& binningPol
}
ind++;

if constexpr (soa::is_soa_filtered_v<T>) {
if constexpr (soa::is_filtered_table<T>) {
if (ind >= selectedRows.size()) {
break;
}
Expand All @@ -141,7 +141,7 @@ std::vector<BinningIndex> groupTable(const T& table, const BP<Cs...>& binningPol
}
}

if constexpr (soa::is_soa_filtered_v<T>) {
if constexpr (soa::is_filtered_table<T>) {
if (ind == selectedRows.size()) {
break;
}
Expand Down Expand Up @@ -1348,7 +1348,7 @@ auto combinations(const BP& binningPolicy, int categoryNeighbours, const T1& out
}
}

template <typename... T2s>
template <soa::is_table... T2s>
auto combinations(const o2::framework::expressions::Filter& filter, const T2s&... tables)
{
if constexpr (isSameType<T2s...>()) {
Expand All @@ -1366,7 +1366,7 @@ CombinationsGenerator<P2<T2s...>> combinations(const P2<T2s...>& policy)
return CombinationsGenerator<P2<T2s...>>(policy);
}

template <template <typename...> typename P2, typename... T2s>
template <template <typename...> typename P2, soa::is_table... T2s>
CombinationsGenerator<P2<Filtered<T2s>...>> combinations(P2<T2s...>&&, const o2::framework::expressions::Filter& filter, const T2s&... tables)
{
return CombinationsGenerator<P2<Filtered<T2s>...>>(P2<Filtered<T2s>...>(tables.select(filter)...));
Expand Down
32 changes: 15 additions & 17 deletions Framework/Core/include/Framework/AnalysisHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ struct WritingCursor<soa::Table<ORIGIN, PC...>> {
template <typename T>
static decltype(auto) extract(T const& arg)
{
if constexpr (soa::is_soa_iterator_v<T>) {
if constexpr (requires(T t) { t.globalIndex(); }) {
return arg.globalIndex();
} else {
static_assert(!framework::has_type<T>(framework::pack<PC...>{}), "Argument type mismatch");
Expand All @@ -104,6 +104,7 @@ struct WritingCursor<soa::Table<ORIGIN, PC...>> {

/// Helper to define output for a Table
template <typename T>
requires soa::is_table<T> || soa::is_iterator<T>
struct OutputForTable {
using table_t = T;
using metadata = typename aod::MetadataTrait<table_t>::metadata;
Expand Down Expand Up @@ -243,16 +244,15 @@ namespace
template <typename T, typename Key>
inline std::shared_ptr<arrow::ChunkedArray> getIndexToKey(arrow::Table* table)
{
using IC = framework::pack_element_t<framework::has_type_at_conditional<soa::is_binding_compatible, Key>(typename T::external_index_columns_t{}), typename T::external_index_columns_t>;
return table->column(framework::has_type_at<IC>(typename T::persistent_columns_t{}));
using IC = framework::pack_element_t<framework::has_type_at_conditional_v<soa::is_binding_compatible, Key>(typename T::external_index_columns_t{}), typename T::external_index_columns_t>;
return table->column(framework::has_type_at_v<IC>(typename T::persistent_columns_t{}));
}

template <typename C>
template <soa::is_column C>
struct ColumnTrait {
static_assert(framework::is_base_of_template_v<o2::soa::Column, C>, "Not a column type!");
using column_t = C;

static constexpr auto listSize()
static consteval auto listSize()
{
if constexpr (std::is_same_v<typename C::type, std::vector<int>>) {
return -1;
Expand Down Expand Up @@ -483,14 +483,14 @@ struct Service {
}
};

template <typename T>
auto getTableFromFilter(const T& table, soa::SelectionVector&& selection)
auto getTableFromFilter(soa::is_filtered_table auto const& table, soa::SelectionVector&& selection)
{
if constexpr (soa::is_soa_filtered_v<std::decay_t<T>>) {
return std::make_unique<o2::soa::Filtered<T>>(std::vector{table}, std::forward<soa::SelectionVector>(selection));
} else {
return std::make_unique<o2::soa::Filtered<T>>(std::vector{table.asArrowTable()}, std::forward<soa::SelectionVector>(selection));
}
return std::make_unique<o2::soa::Filtered<std::decay_t<decltype(table)>>>(std::vector{table}, std::forward<soa::SelectionVector>(selection));
}

auto getTableFromFilter(soa::is_not_filtered_table auto const& table, soa::SelectionVector&& selection)
{
return std::make_unique<o2::soa::Filtered<std::decay_t<decltype(table)>>>(std::vector{table.asArrowTable()}, std::forward<soa::SelectionVector>(selection));
}

void initializePartitionCaches(std::set<uint32_t> const& hashes, std::shared_ptr<arrow::Schema> const& schema, expressions::Filter const& filter, gandiva::NodePtr& tree, gandiva::FilterPtr& gfilter);
Expand Down Expand Up @@ -611,20 +611,18 @@ struct Partition {
namespace o2::soa
{
/// On-the-fly adding of expression columns
template <typename T, typename... Cs>
template <soa::is_table T, soa::is_spawnable_column... Cs>
auto Extend(T const& table)
{
static_assert((soa::is_type_spawnable_v<Cs> && ...), "You can only extend a table with expression columns");
using output_t = Join<T, soa::Table<o2::framework::OriginEnc{"JOIN"}, Cs...>>;
return output_t{{o2::framework::spawner<o2::framework::OriginEnc{"JOIN"}>(framework::pack<Cs...>{}, {table.asArrowTable()}, "dynamicExtension"), table.asArrowTable()}, 0};
}

/// Template function to attach dynamic columns on-the-fly (e.g. inside
/// process() function). Dynamic columns need to be compatible with the table.
template <typename T, typename... Cs>
template <soa::is_table T, soa::is_dynamic_column... Cs>
auto Attach(T const& table)
{
static_assert((framework::is_base_of_template_v<o2::soa::DynamicColumn, Cs> && ...), "You can only attach dynamic columns");
using output_t = Join<T, o2::soa::Table<o2::framework::OriginEnc{"JOIN"}, Cs...>>;
return output_t{{table.asArrowTable()}, table.offset()};
}
Expand Down
49 changes: 6 additions & 43 deletions Framework/Core/include/Framework/AnalysisManagers.h
Original file line number Diff line number Diff line change
Expand Up @@ -417,43 +417,6 @@ struct OutputManager<Builds<T>> {
}
};

template <typename T>
class has_instance
{
using one = char;
struct two {
char x[2];
};

template <typename C>
static one test(decltype(&C::instance));
template <typename C>
static two test(...);

public:
enum { value = sizeof(test<T>(nullptr)) == sizeof(char) };
};

template <typename T>
class has_end_of_stream
{
using one = char;
struct two {
char x[2];
};

template <typename C>
static one test(decltype(&C::endOfStream));
template <typename C>
static two test(...);

public:
enum { value = sizeof(test<T>(nullptr)) == sizeof(char) };
};

template <typename T>
inline constexpr bool has_end_of_stream_v = has_end_of_stream<T>::value;

template <typename T>
struct ServiceManager {
template <typename ANY>
Expand All @@ -477,7 +440,7 @@ struct ServiceManager {

template <typename T>
struct ServiceManager<Service<T>> {
static bool add(std::vector<ServiceSpec>& specs, Service<T>& service)
static bool add(std::vector<ServiceSpec>& specs, Service<T>& /*service*/)
{
if constexpr (o2::framework::is_base_of_template_v<LoadableServicePlugin, T>) {
T p = T{};
Expand All @@ -489,7 +452,7 @@ struct ServiceManager<Service<T>> {

static bool prepare(InitContext& context, Service<T>& service)
{
if constexpr (has_instance<T>::value) {
if constexpr (requires { &T::instance; }) {
service.service = &(T::instance()); // Sigh...
return true;
} else {
Expand All @@ -500,11 +463,11 @@ struct ServiceManager<Service<T>> {
}

/// If a service has a method endOfStream, it is called at the end of the stream.
static bool postRun(EndOfStreamContext& context, Service<T>& service)
static bool postRun(EndOfStreamContext& /*context*/, Service<T>& service)
{
// FIXME: for the moment we only need endOfStream to be
// stateless. In the future we might want to pass it EndOfStreamContext
if constexpr (has_end_of_stream_v<T>) {
if constexpr (requires { &T::endOfStream; }) {
service.service->endOfStream();
return true;
}
Expand Down Expand Up @@ -637,7 +600,7 @@ struct SpawnManager {
static bool requestInputs(std::vector<InputSpec>&, T const&) { return false; }
};

template <typename TABLE>
template <soa::is_table TABLE>
struct SpawnManager<Spawns<TABLE>> {
static bool requestInputs(std::vector<InputSpec>& inputs, Spawns<TABLE>& spawns)
{
Expand All @@ -656,7 +619,7 @@ struct IndexManager {
static bool requestInputs(std::vector<InputSpec>&, T const&) { return false; };
};

template <typename IDX>
template <soa::is_index_table IDX>
struct IndexManager<Builds<IDX>> {
static bool requestInputs(std::vector<InputSpec>& inputs, Builds<IDX>& builds)
{
Expand Down
Loading
Loading