Skip to content

Commit

Permalink
fix sharding call lock bug
Browse files Browse the repository at this point in the history
  • Loading branch information
JimmyShi22 committed Mar 21, 2024
1 parent 032bd17 commit 2e7f935
Show file tree
Hide file tree
Showing 15 changed files with 541 additions and 38 deletions.
5 changes: 1 addition & 4 deletions bcos-executor/src/executive/CoroutineTransactionExecutive.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,7 @@ class CoroutineTransactionExecutive : public TransactionExecutive
virtual std::optional<Coroutine::push_type>& getPushMessage() { return m_pushMessage; }
virtual CallParameters::UniquePtr& getExchangeMessageRef() { return m_exchangeMessage; }

virtual std::shared_ptr<SyncStorageWrapper> getSyncStorageWrapper()
{
return m_syncStorageWrapper;
}
std::shared_ptr<SyncStorageWrapper> getSyncStorageWrapper() { return m_syncStorageWrapper; }

protected:
CallParameters::UniquePtr m_exchangeMessage = nullptr;
Expand Down
170 changes: 170 additions & 0 deletions bcos-executor/src/executive/ShardingSyncStorageWrapper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#pragma once

#include "SyncStorageWrapper.h"
#include "bcos-executor/src/Common.h"
#include "bcos-table/src/StorageWrapper.h"

namespace bcos::executor
{
using KeyLockResponse = std::tuple<Error::UniquePtr>;
using AcquireKeyLockResponse = std::tuple<Error::UniquePtr, std::vector<std::string>>;

class ShardingKeyLocks
{
public:
using Ptr = std::shared_ptr<ShardingKeyLocks>;
ShardingKeyLocks() = default;
virtual ~ShardingKeyLocks() = default;

static std::string keyName(const std::string_view& address, const std::string_view& key)
{
return std::string(address) + "-" + std::string(key);
}


// Notice: this function is just for testing, because it assume passing the right address format
static std::pair<std::string_view, std::string_view> toAddressAndKey(
const std::string_view& keyName)
{
std::string_view str = keyName;
// if keyName start from 0x, remove it
if (keyName.size() > 2 && keyName[0] == '0' && keyName[1] == 'x')
{
str = keyName.substr(2);
}

// address is 40 characters, separate it
return std::make_pair(str.substr(0, 40), str.substr(41));
}


bool existsKeyLock(const std::string_view& address, const std::string_view& key)
{
return m_existsKeyLocks.find(keyName(address, key)) != m_existsKeyLocks.end();
}

void importExistsKeyLocks(gsl::span<std::string> keyLocks)
{
m_existsKeyLocks.clear();

for (auto& it : keyLocks)
{
m_existsKeyLocks.emplace(std::move(it));
}
}

std::vector<std::string> exportKeyLocks()
{
std::vector<std::string> keyLocks;
keyLocks.reserve(m_myKeyLocks.size());
for (auto& it : m_myKeyLocks)
{
keyLocks.emplace_back(std::move(it));
}

m_myKeyLocks.clear();

return keyLocks;
}

void importMyKeyLock(const std::string_view& address, const std::string_view& key)
{
auto keyName = ShardingKeyLocks::keyName(address, key);
auto it = m_myKeyLocks.lower_bound(keyName);
if (it == m_myKeyLocks.end() || *it != keyName)
{
m_myKeyLocks.emplace_hint(it, keyName);
}
}

private:
std::set<std::string, std::less<>> m_existsKeyLocks;
std::set<std::string, std::less<>> m_myKeyLocks;
};

class ShardingSyncStorageWrapper : public SyncStorageWrapper
{
public:
using Ptr = std::shared_ptr<ShardingSyncStorageWrapper>;

ShardingSyncStorageWrapper(ShardingKeyLocks::Ptr shardingKeyLocks,
storage::StateStorageInterface::Ptr storage,
std::function<void(std::string)> externalAcquireKeyLocks,
bcos::storage::Recoder::Ptr recoder)
: SyncStorageWrapper(storage, externalAcquireKeyLocks, recoder),
m_externalAcquireKeyLocks(std::move(externalAcquireKeyLocks)),
m_shardingKeyLocks(shardingKeyLocks)
{}

ShardingSyncStorageWrapper(const ShardingSyncStorageWrapper&) = delete;
ShardingSyncStorageWrapper(ShardingSyncStorageWrapper&&) = delete;
ShardingSyncStorageWrapper& operator=(const ShardingSyncStorageWrapper&) = delete;
ShardingSyncStorageWrapper& operator=(ShardingSyncStorageWrapper&&) = delete;


std::optional<storage::Entry> getRow(
const std::string_view& table, const std::string_view& _key) override
{
acquireKeyLock(_key);

return StorageWrapper::getRow(table, _key);
}

std::vector<std::optional<storage::Entry>> getRows(const std::string_view& table,
RANGES::any_view<std::string_view,
RANGES::category::input | RANGES::category::random_access | RANGES::category::sized>
keys) override
{
for (auto it : keys)
{
acquireKeyLock(it);
}

return StorageWrapper::getRows(table, keys);
}

void setRow(
const std::string_view& table, const std::string_view& key, storage::Entry entry) override
{
acquireKeyLock(key);

StorageWrapper::setRow(table, key, std::move(entry));
}

void importExistsKeyLocks(gsl::span<std::string> keyLocks) override
{
m_shardingKeyLocks->importExistsKeyLocks(std::move(keyLocks));
}

std::vector<std::string> exportKeyLocks() override
{
return m_shardingKeyLocks->exportKeyLocks();
}

ShardingKeyLocks::Ptr getKeyLocks() { return m_shardingKeyLocks; }

private:
void acquireKeyLock(const std::string_view& key)
{
// ignore code, shard, codeHash
if (!key.compare(ACCOUNT_CODE) || !key.compare(ACCOUNT_SHARD) ||
!key.compare(ACCOUNT_CODE_HASH))
{
// ignore static system key
return;
}

if (m_shardingKeyLocks->existsKeyLock(m_contractAddress, key))
{
m_externalAcquireKeyLocks(ShardingKeyLocks::keyName(m_contractAddress, key));
}

m_shardingKeyLocks->importMyKeyLock(m_contractAddress, key);
}

std::function<void(std::string)> m_externalAcquireKeyLocks;

ShardingKeyLocks::Ptr m_shardingKeyLocks;
std::string m_contractAddress;
};
} // namespace bcos::executor
54 changes: 52 additions & 2 deletions bcos-executor/src/executive/ShardingTransactionExecutive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//

#include "ShardingTransactionExecutive.h"
#include "ShardingSyncStorageWrapper.h"
#include "bcos-table/src/ContractShardUtils.h"

using namespace bcos::executor;
Expand All @@ -14,7 +15,18 @@ ShardingTransactionExecutive::ShardingTransactionExecutive(const BlockContext& b
: PromiseTransactionExecutive(
pool, std::move(blockContext), std::move(contractAddress), contextID, seq, gasInjector),
m_usePromise(usePromise)
{}
{
if (m_blockContext.features().get(
ledger::Features::Flag::bugfix_sharding_call_in_child_executive))
{
m_syncStorageWrapper = std::make_shared<ShardingSyncStorageWrapper>(
std::make_shared<ShardingKeyLocks>(), m_blockContext.storage(),
m_syncStorageWrapper->takeExternalAcquireKeyLocks(), m_recoder);
m_storageWrapper = m_syncStorageWrapper.get();
m_storageWrapper->setCodeCache(m_blockContext.getCodeCache());
m_storageWrapper->setCodeHashCache(m_blockContext.getCodeHashCache());
}
}

CallParameters::UniquePtr ShardingTransactionExecutive::start(CallParameters::UniquePtr input)
{
Expand Down Expand Up @@ -60,7 +72,19 @@ CallParameters::UniquePtr ShardingTransactionExecutive::externalCall(
m_shardName = getContractShard(m_contractAddress);
}

auto toShardName = getContractShard(input->receiveAddress);
std::string_view to = input->receiveAddress;

if (m_blockContext.features().get(
ledger::Features::Flag::bugfix_sharding_call_in_child_executive))
{
if (input->data == bcos::protocol::GET_CODE_INPUT_BYTES)
{
to = input->codeAddress;
}
}

std::string toShardName = getContractShard(to);

if (toShardName != m_shardName.value())
{
EXECUTIVE_LOG(DEBUG) << LOG_BADGE("Sharding")
Expand Down Expand Up @@ -100,4 +124,30 @@ std::string ShardingTransactionExecutive::getContractShard(const std::string_vie
{
auto tableName = getContractTableName(contractAddress, m_blockContext.isWasm());
return ContractShardUtils::getContractShard(storage(), tableName);
}

ShardingChildTransactionExecutive::ShardingChildTransactionExecutive(
ShardingTransactionExecutive* parent, const BlockContext& blockContext,
std::string contractAddress, int64_t contextID, int64_t seq,
const wasm::GasInjector& gasInjector, ThreadPool::Ptr pool, bool usePromise)
: ShardingTransactionExecutive(
blockContext, contractAddress, contextID, seq, gasInjector, pool, usePromise),

// for coroutine
m_pullMessageRef(parent->getPullMessage()),
m_pushMessageRef(parent->getPushMessage()),
m_exchangeMessageRef(parent->getExchangeMessageRef())
{
auto parentKeyLocks =
dynamic_pointer_cast<ShardingSyncStorageWrapper>(parent->getSyncStorageWrapper())
->getKeyLocks();

m_syncStorageWrapper = std::make_shared<ShardingSyncStorageWrapper>(parentKeyLocks,
m_blockContext.storage(), m_syncStorageWrapper->takeExternalAcquireKeyLocks(), m_recoder);
m_storageWrapper = m_syncStorageWrapper.get();
m_storageWrapper->setCodeCache(m_blockContext.getCodeCache());
m_storageWrapper->setCodeHashCache(m_blockContext.getCodeHashCache());

// for promise executive
setPromiseMessageSwapper(parent->getPromiseMessageSwapper());
}
16 changes: 1 addition & 15 deletions bcos-executor/src/executive/ShardingTransactionExecutive.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,21 +72,7 @@ class ShardingChildTransactionExecutive : public ShardingTransactionExecutive
ShardingChildTransactionExecutive(ShardingTransactionExecutive* parent,
const BlockContext& blockContext, std::string contractAddress, int64_t contextID,
int64_t seq, const wasm::GasInjector& gasInjector, ThreadPool::Ptr pool = nullptr,
bool usePromise = false)
: ShardingTransactionExecutive(
blockContext, contractAddress, contextID, seq, gasInjector, pool, usePromise),

// for coroutine
m_pullMessageRef(parent->getPullMessage()),
m_pushMessageRef(parent->getPushMessage()),
m_exchangeMessageRef(parent->getExchangeMessageRef())
{
m_syncStorageWrapper = parent->getSyncStorageWrapper();
m_storageWrapper = m_syncStorageWrapper.get();

// for promise executive
setPromiseMessageSwapper(parent->getPromiseMessageSwapper());
}
bool usePromise = false);

CallParameters::UniquePtr start(CallParameters::UniquePtr input) override
{
Expand Down
15 changes: 10 additions & 5 deletions bcos-executor/src/executive/SyncStorageWrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ class SyncStorageWrapper : public storage::StorageWrapper
SyncStorageWrapper& operator=(SyncStorageWrapper&&) = delete;


std::optional<storage::Entry> getRow(
virtual std::optional<storage::Entry> getRow(
const std::string_view& table, const std::string_view& _key) override
{
acquireKeyLock(_key);

return StorageWrapper::getRow(table, _key);
}

std::vector<std::optional<storage::Entry>> getRows(const std::string_view& table,
virtual std::vector<std::optional<storage::Entry>> getRows(const std::string_view& table,
RANGES::any_view<std::string_view,
RANGES::category::input | RANGES::category::random_access | RANGES::category::sized>
keys) override
Expand All @@ -55,15 +55,15 @@ class SyncStorageWrapper : public storage::StorageWrapper
return StorageWrapper::getRows(table, keys);
}

void setRow(
virtual void setRow(
const std::string_view& table, const std::string_view& key, storage::Entry entry) override
{
acquireKeyLock(key);

StorageWrapper::setRow(table, key, std::move(entry));
}

void importExistsKeyLocks(gsl::span<std::string> keyLocks)
virtual void importExistsKeyLocks(gsl::span<std::string> keyLocks)
{
m_existsKeyLocks.clear();

Expand All @@ -73,7 +73,7 @@ class SyncStorageWrapper : public storage::StorageWrapper
}
}

std::vector<std::string> exportKeyLocks()
virtual std::vector<std::string> exportKeyLocks()
{
std::vector<std::string> keyLocks;
keyLocks.reserve(m_myKeyLocks.size());
Expand All @@ -87,6 +87,11 @@ class SyncStorageWrapper : public storage::StorageWrapper
return keyLocks;
}

std::function<void(std::string)> takeExternalAcquireKeyLocks()
{
return std::move(m_externalAcquireKeyLocks);
}

private:
void acquireKeyLock(const std::string_view& key)
{
Expand Down
2 changes: 2 additions & 0 deletions bcos-executor/src/executive/TransactionExecutive.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ class TransactionExecutive : public std::enable_shared_from_this<TransactionExec
std::string getContractTableName(
const std::string_view& _address, bool isWasm = false, bool isCreate = false);

std::shared_ptr<storage::Recoder> getRecoder() { return m_recoder; }

protected:
std::tuple<std::unique_ptr<HostContext>, CallParameters::UniquePtr> call(
CallParameters::UniquePtr callParameters);
Expand Down
Loading

0 comments on commit 2e7f935

Please sign in to comment.