Skip to content

Commit

Permalink
[ETFeeder] Resolve deps based on not only data_deps, but also ctrl_deps
Browse files Browse the repository at this point in the history
  • Loading branch information
changhai0109 committed Jan 19, 2024
1 parent 8f0edff commit 4f5c109
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 13 deletions.
19 changes: 6 additions & 13 deletions et_feeder/et_feeder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,8 @@ void ETFeeder::freeChildrenNodes(uint64_t node_id) {
shared_ptr<ETFeederNode> node = dep_graph_[node_id];
for (auto child : node->getChildren()) {
auto child_chakra = child->getChakraNode();
for (auto it = child_chakra->mutable_data_deps()->begin();
it != child_chakra->mutable_data_deps()->end();
++it) {
if (*it == node_id) {
child_chakra->mutable_data_deps()->erase(it);
break;
}
}
if (child_chakra->data_deps().size() == 0) {
child->releaseParent(node_id);
if (child->getUnreleasedParents().size() == 0) {
dep_free_node_id_set_.emplace(child_chakra->id());
dep_free_node_queue_.emplace(child);
}
Expand All @@ -95,13 +88,13 @@ shared_ptr<ETFeederNode> ETFeeder::readNode() {
shared_ptr<ETFeederNode> node = make_shared<ETFeederNode>(pkt_msg);

bool dep_unresolved = false;
for (int i = 0; i < pkt_msg->data_deps_size(); ++i) {
auto parent_node = dep_graph_.find(pkt_msg->data_deps(i));
for (uint64_t parent_node_id : node->all_deps()) {
auto parent_node = dep_graph_.find(parent_node_id);
if (parent_node != dep_graph_.end()) {
parent_node->second->addChild(node);
} else {
dep_unresolved = true;
node->addDepUnresolvedParentID(pkt_msg->data_deps(i));
node->addDepUnresolvedParentID(parent_node_id);
}
}

Expand Down Expand Up @@ -160,7 +153,7 @@ void ETFeeder::readNextWindow() {
uint64_t node_id = node_id_node.first;
shared_ptr<ETFeederNode> node = node_id_node.second;
if ((dep_free_node_id_set_.count(node_id) == 0) &&
(node->getChakraNode()->data_deps().size() == 0)) {
(node->getUnreleasedParents().size() == 0)) {
dep_free_node_id_set_.emplace(node_id);
dep_free_node_queue_.emplace(node);
}
Expand Down
28 changes: 28 additions & 0 deletions et_feeder/et_feeder_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ ETFeederNode::ETFeederNode(std::shared_ptr<ChakraProtoMsg::Node> node) {
assign_attr_val(node, i, (void*)(&comm_tag_));
}
}

for (const auto& parent_id : node->data_deps())
this->all_deps_.emplace(parent_id);
for (const auto& parent_id : node->ctrl_deps())
this->all_deps_.emplace(parent_id);

for (const auto& parent_id : this->all_deps_)
this->unreleased_parent_ids_.emplace(parent_id);
}

shared_ptr<ChakraProtoMsg::Node> ETFeederNode::getChakraNode() {
Expand Down Expand Up @@ -67,6 +75,26 @@ void ETFeederNode::setDepUnresolvedParentIDs(
dep_unresolved_parent_ids_ = dep_unresolved_parent_ids;
}

void ETFeederNode::releaseParent(const uint64_t& parent_id) {
this->unreleased_parent_ids_.erase(parent_id);
}

const std::unordered_set<uint64_t>& ETFeederNode::getUnreleasedParents() {
return this->unreleased_parent_ids_;
}

const auto& ETFeederNode::data_deps() {
return this->getChakraNode()->data_deps();
}

const auto& ETFeederNode::ctrl_deps() {
return this->getChakraNode()->ctrl_deps();
}

const std::unordered_set<uint64_t>& ETFeederNode::all_deps() {
return this->all_deps_;
}

void ETFeederNode::assign_attr_val(
shared_ptr<ChakraProtoMsg::Node> node,
int i,
Expand Down
8 changes: 8 additions & 0 deletions et_feeder/et_feeder_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ class ETFeederNode {
std::vector<uint64_t> getDepUnresolvedParentIDs();
void setDepUnresolvedParentIDs(
std::vector<uint64_t> const& dep_unresolved_parent_ids);
void releaseParent(const uint64_t& parent_id);
const std::unordered_set<uint64_t>& getUnreleasedParents();
const auto& data_deps();
const auto& ctrl_deps();
const std::unordered_set<uint64_t>& all_deps();

uint64_t id();
std::string name();
Expand Down Expand Up @@ -46,6 +51,9 @@ class ETFeederNode {
std::unordered_set<std::shared_ptr<ETFeederNode>> children_set_{};
std::vector<std::shared_ptr<ETFeederNode>> children_vec_{};
std::vector<uint64_t> dep_unresolved_parent_ids_{};
std::unordered_set<uint64_t> unreleased_parent_ids_{};

std::unordered_set<uint64_t> all_deps_{};

uint64_t id_;
std::string name_;
Expand Down

0 comments on commit 4f5c109

Please sign in to comment.