Skip to content

Commit

Permalink
Add fs fault injection in memory arbitrator fuzzer spill
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Nov 11, 2024
1 parent c8c4707 commit dd9e503
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 48 deletions.
188 changes: 143 additions & 45 deletions velox/exec/fuzzer/MemoryArbitrationFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,17 @@
#include <boost/random/uniform_int_distribution.hpp>

#include "velox/common/file/FileSystems.h"
#include "velox/common/file/tests/FaultyFileSystem.h"
#include "velox/common/memory/SharedArbitrator.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/dwio/dwrf/RegisterDwrfReader.h" // @manual
#include "velox/dwio/dwrf/RegisterDwrfWriter.h" // @manual
#include "velox/exec/MemoryReclaimer.h"
#include "velox/exec/TableWriter.h"
#include "velox/exec/fuzzer/FuzzerUtil.h"
#include "velox/exec/tests/utils/ArbitratorTestUtil.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"
#include "velox/functions/lib/aggregates/AverageAggregateBase.h"
#include "velox/functions/sparksql/aggregates/Register.h"
#include "velox/serializers/CompactRowSerializer.h"
#include "velox/serializers/PrestoSerializer.h"
Expand Down Expand Up @@ -73,6 +71,21 @@ DEFINE_int32(

DEFINE_int64(arbitrator_capacity, 256L << 20, "Arbitrator capacity in bytes.");

DEFINE_double(
faulty_spill_ratio,
0.1,
"Chance of fault injection in file system during spill "
"(expressed as double from 0 to 1).");

DEFINE_int32(
faulty_spill_max_trigger_threshold,
200,
"Maximum number of invocations of file operation hooks before triggering "
"fault injection. The actual threshold count is taken as a random number "
"between 0 to this number");

using namespace facebook::velox::tests::utils;

namespace facebook::velox::exec::test {
namespace {

Expand All @@ -94,11 +107,13 @@ class MemoryArbitrationFuzzer {
size_t successCount{0};
size_t oomCount{0};
size_t abortCount{0};
size_t spillFsFaultCount{0};

void print() const {
std::stringstream ss;
ss << "success count = " << successCount << ", oom count = " << oomCount
<< ", abort count = " << abortCount;
<< ", abort count = " << abortCount
<< ", spill fs fault count = " << spillFsFaultCount;
LOG(INFO) << ss.str();
}
};
Expand All @@ -120,6 +135,31 @@ class MemoryArbitrationFuzzer {
return boost::random::uniform_int_distribution<int32_t>(min, max)(rng_);
}

// Scoped directory that, if enabled fault injection, makes sure the fault
// injection is cleared upon its destruction.
class ScopedFileDirectory {
public:
explicit ScopedFileDirectory(std::shared_ptr<TempDirectoryPath> directory)
: directory_(directory) {}
~ScopedFileDirectory() {
auto faultyFileSystem = std::dynamic_pointer_cast<FaultyFileSystem>(
filesystems::getFileSystem(directory_->getPath(), nullptr));
faultyFileSystem->clearFileFaultInjections();
};

std::shared_ptr<TempDirectoryPath> directory() {
return directory_;
}

private:
const std::shared_ptr<TempDirectoryPath> directory_;
};

std::shared_ptr<TempDirectoryPath> generateDirectory(
FileFaultInjectionHook&& injectionHook);

ScopedFileDirectory generateMaybeFaultySpillDirectory();

// Returns a list of randomly generated key types for join and aggregation.
std::vector<TypePtr> generateKeyTypes(int32_t numKeys);

Expand Down Expand Up @@ -671,10 +711,57 @@ MemoryArbitrationFuzzer::orderByPlans(const std::string& tableDir) {
return plans;
}

std::shared_ptr<TempDirectoryPath> MemoryArbitrationFuzzer::generateDirectory(
FileFaultInjectionHook&& injectionHook) {
if (injectionHook == nullptr) {
return exec::test::TempDirectoryPath::create(false);
}
const auto directory = exec::test::TempDirectoryPath::create(true);
auto faultyFileSystem = std::dynamic_pointer_cast<FaultyFileSystem>(
filesystems::getFileSystem(directory->getPath(), nullptr));
faultyFileSystem->setFileInjectionHook(std::move(injectionHook));
return directory;
}

MemoryArbitrationFuzzer::ScopedFileDirectory
MemoryArbitrationFuzzer::generateMaybeFaultySpillDirectory() {
FuzzerGenerator fsRng(rng_());
const auto injectFsFault = coinToss(fsRng, FLAGS_faulty_spill_ratio);
const auto injectFsCountThreshold =
getRandomIndex(fsRng, FLAGS_faulty_spill_max_trigger_threshold);
if (injectFsFault) {
using OpType = FaultFileOperation::Type;
static const std::vector<std::unordered_set<OpType>> opTypes{
{OpType::kRead},
{OpType::kReadv},
{OpType::kWrite},
{OpType::kRead, OpType::kReadv},
{OpType::kRead, OpType::kWrite},
{OpType::kReadv, OpType::kWrite}};

return ScopedFileDirectory(generateDirectory(
[this,
injectFsCountThreshold,
injectTypes = opTypes[getRandomIndex(fsRng, opTypes.size() - 1)]](
FaultFileOperation* op) {
static std::atomic<uint32_t> invocationCount{0};
if (injectTypes.count(op->type) == 0) {
return;
}
if (invocationCount.fetch_add(1) + 1 >= injectFsCountThreshold) {
++stats_.wlock()->spillFsFaultCount;
VELOX_FAIL(
"Fault file injection on {}",
FaultFileOperation::typeString(op->type));
}
}));
}
return ScopedFileDirectory(generateDirectory(nullptr));
}

void MemoryArbitrationFuzzer::verify() {
const auto outputDirectory = TempDirectoryPath::create();
const auto spillDirectory = exec::test::TempDirectoryPath::create();
const auto tableScanDir = exec::test::TempDirectoryPath::create();
auto scopedSpillDirectory = generateMaybeFaultySpillDirectory();
const auto tableScanDir = generateDirectory(nullptr);

std::vector<PlanWithSplits> plans;
for (const auto& plan : hashJoinPlans(tableScanDir->getPath())) {
Expand All @@ -700,45 +787,56 @@ void MemoryArbitrationFuzzer::verify() {
queryThreads.reserve(numThreads);
for (int i = 0; i < numThreads; ++i) {
auto seed = rng_();
queryThreads.emplace_back([&, i, seed]() {
FuzzerGenerator rng(seed);
while (!stop) {
try {
const auto queryCtx = newQueryCtx(
memory::memoryManager(),
executor_.get(),
FLAGS_arbitrator_capacity);
const auto plan = plans.at(getRandomIndex(rng, plans.size() - 1));
AssertQueryBuilder builder(plan.plan);
builder.queryCtx(queryCtx);
for (const auto& [planNodeId, nodeSplits] : plan.splits) {
builder.splits(planNodeId, nodeSplits);
}

if (coinToss(rng, 0.3)) {
builder.queryCtx(queryCtx).copyResults(pool_.get());
} else {
auto res =
builder.configs(queryConfigsWithSpill_)
.spillDirectory(
spillDirectory->getPath() + fmt::format("/{}/", i))
.queryCtx(queryCtx)
.copyResults(pool_.get());
}
++stats_.wlock()->successCount;
} catch (const VeloxException& e) {
auto lockedStats = stats_.wlock();
if (e.errorCode() == error_code::kMemCapExceeded.c_str()) {
++lockedStats->oomCount;
} else if (e.errorCode() == error_code::kMemAborted.c_str()) {
++lockedStats->abortCount;
} else {
LOG(ERROR) << "Unexpected exception:\n" << e.what();
std::rethrow_exception(std::current_exception());
queryThreads.emplace_back(
[&, spillDirectory = scopedSpillDirectory.directory(), i, seed]() {
FuzzerGenerator rng(seed);
while (!stop) {
const auto prevSpillFsFaultCount =
stats_.rlock()->spillFsFaultCount;
try {
const auto queryCtx = newQueryCtx(
memory::memoryManager(),
executor_.get(),
FLAGS_arbitrator_capacity);
const auto plan = plans.at(getRandomIndex(rng, plans.size() - 1));
AssertQueryBuilder builder(plan.plan);
builder.queryCtx(queryCtx);
for (const auto& [planNodeId, nodeSplits] : plan.splits) {
builder.splits(planNodeId, nodeSplits);
}

if (coinToss(rng, 0.3)) {
builder.queryCtx(queryCtx).copyResults(pool_.get());
} else {
auto res =
builder.configs(queryConfigsWithSpill_)
.spillDirectory(
spillDirectory->getPath() + fmt::format("/{}/", i))
.queryCtx(queryCtx)
.copyResults(pool_.get());
}
++stats_.wlock()->successCount;
VELOX_CHECK_EQ(
stats_.rlock()->spillFsFaultCount, prevSpillFsFaultCount);
} catch (const VeloxException& e) {
auto lockedStats = stats_.wlock();
if (e.errorCode() == error_code::kMemCapExceeded.c_str()) {
++lockedStats->oomCount;
} else if (e.errorCode() == error_code::kMemAborted.c_str()) {
++lockedStats->abortCount;
} else if (e.errorCode() == error_code::kInvalidState.c_str()) {
VELOX_CHECK_GT(
stats_.rlock()->spillFsFaultCount, prevSpillFsFaultCount);
VELOX_CHECK(
e.message().find("Fault file injection on") !=
std::string::npos);
} else {
LOG(ERROR) << "Unexpected exception:\n" << e.what();
std::rethrow_exception(std::current_exception());
}
}
}
}
}
});
});
}

std::this_thread::sleep_for(
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/fuzzer/MemoryArbitrationFuzzerRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "velox/common/file/FileSystems.h"

#include "velox/common/file/tests/FaultyFileSystem.h"
#include "velox/exec/fuzzer/MemoryArbitrationFuzzer.h"
#include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h"
#include "velox/functions/prestosql/registration/RegistrationFunctions.h"
Expand All @@ -31,6 +32,7 @@ class MemoryArbitrationFuzzerRunner {
static int run(size_t seed) {
serializer::presto::PrestoVectorSerde::registerVectorSerde();
filesystems::registerLocalFileSystem();
tests::utils::registerFaultyFileSystem();
functions::prestosql::registerAllScalarFunctions();
aggregate::prestosql::registerAllAggregateFunctions();
memoryArbitrationFuzzer(seed);
Expand Down
6 changes: 3 additions & 3 deletions velox/exec/tests/utils/TempDirectoryPath.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ class TempDirectoryPath {
TempDirectoryPath(const TempDirectoryPath&) = delete;
TempDirectoryPath& operator=(const TempDirectoryPath&) = delete;

/// If fault injection is enabled, the returned the file path has the faulty
/// If fault injection is enabled, the returned file path will have the faulty
/// file system prefix scheme. The velox fs then opens the directory through
/// the faulty file system. The actual file operation might either fails or
/// delegate to the actual file.
/// the faulty file system. The file operation will then either fail or be
/// delegated to the actual file.
const std::string& getPath() const {
return path_;
}
Expand Down

0 comments on commit dd9e503

Please sign in to comment.