Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo committed Nov 8, 2024
1 parent 9bf45ed commit 44300ab
Show file tree
Hide file tree
Showing 10 changed files with 194 additions and 248 deletions.
224 changes: 96 additions & 128 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ ClusterQueueHelper::QueueLiveState::QueueLiveState(
}

// MANIPULATORS
void ClusterQueueHelper::QueueLiveState::reset()
void ClusterQueueHelper::QueueLiveState::resetAndKeepPending()
{
// NOTE: Do not reset d_pending and d_inFlight, and some other data.

Expand Down Expand Up @@ -422,46 +422,46 @@ ClusterQueueHelper::assignQueue(const QueueContextSp& queueContext)
return QueueAssignmentResult::k_ASSIGNMENT_OK;
}

void ClusterQueueHelper::onQueueAssigning(const bmqt::Uri& uri,
bool processingPendingRequests)
{
// executed by the cluster *DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(
d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p));
BSLS_ASSERT_SAFE(!d_cluster_p->isRemote());

QueueContextSp queueContext;
QueueContextMapIter queueContextIt = d_queues.find(uri);
if (queueContextIt == d_queues.end()) {
// Queue unknown, create a new one
queueContext.reset(new (*d_allocator_p)
QueueContext(uri, d_allocator_p),
d_allocator_p);

d_queues[uri] = queueContext;
}
else {
queueContext = queueContextIt->second;

if (d_cluster_p->isCSLModeEnabled()) {
queueContext->d_liveQInfo.d_queueExpirationTimestampMs = 0;
}
}

if (!d_cluster_p->isCSLModeEnabled()) {
queueContext->d_stateQInfo_sp = d_clusterState_p->domainStates()
.at(uri.qualifiedDomain())
->queuesInfo()
.at(uri);

// Process the pending requests on this machine, if any.
if (processingPendingRequests) {
onQueueContextAssigned(queueContext);
}
}
}
// void ClusterQueueHelper::onQueueAssigning(const bmqt::Uri& uri,
// bool processingPendingRequests)
//{
// // executed by the cluster *DISPATCHER* thread
//
// // PRECONDITIONS
// BSLS_ASSERT_SAFE(
// d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p));
// BSLS_ASSERT_SAFE(!d_cluster_p->isRemote());
//
// QueueContextSp queueContext;
// QueueContextMapIter queueContextIt = d_queues.find(uri);
// if (queueContextIt == d_queues.end()) {
// // Queue unknown, create a new one
// queueContext.reset(new (*d_allocator_p)
// QueueContext(uri, d_allocator_p),
// d_allocator_p);
//
// d_queues[uri] = queueContext;
// }
// else {
// queueContext = queueContextIt->second;
//
// if (d_cluster_p->isCSLModeEnabled()) {
// queueContext->d_liveQInfo.d_queueExpirationTimestampMs = 0;
// }
// }
//
// if (!d_cluster_p->isCSLModeEnabled()) {
// queueContext->d_stateQInfo_sp = d_clusterState_p->domainStates()
// .at(uri.qualifiedDomain())
// ->queuesInfo()
// .at(uri);
//
// // Process the pending requests on this machine, if any.
// if (processingPendingRequests) {
// onQueueContextAssigned(queueContext);
// }
// }
// }

bool ClusterQueueHelper::onQueueUnassigning(
bool* hasInFlightRequests,
Expand Down Expand Up @@ -541,7 +541,7 @@ bool ClusterQueueHelper::onQueueUnassigning(
<< queueContext->d_liveQInfo.d_pending.size() << "]";

d_queuesById.erase(queueContext->d_liveQInfo.d_id);
queueContext->d_liveQInfo.reset();
queueContext->d_liveQInfo.resetAndKeepPending();

*hasInFlightRequests = true;
}
Expand Down Expand Up @@ -4080,11 +4080,6 @@ void ClusterQueueHelper::onQueueAssigned(
d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p));
BSLS_ASSERT_SAFE(!d_cluster_p->isRemote());

if (!d_cluster_p->isCSLModeEnabled()) {
// REVISIT
return; // RETURN
}

const mqbnet::ClusterNode* leaderNode =
d_clusterData_p->electorInfo().leaderNode();
const bsl::string& leaderDescription = leaderNode
Expand Down Expand Up @@ -4135,11 +4130,8 @@ void ClusterQueueHelper::onQueueAssigned(
// Update queue's mapping etc.
mqbc::ClusterState::QueueKeysInsertRc insertRc =
d_clusterState_p->queueKeys().insert(info.key());
if (insertRc.second) {
d_clusterState_p->domainStates()
.at(info.uri().qualifiedDomain())
->adjustQueueCount(1);
}
BSLS_ASSERT_SAFE(insertRc.second);
(void)insertRc.second;
}
}
else {
Expand All @@ -4161,22 +4153,19 @@ void ClusterQueueHelper::onQueueAssigned(
return; // RETURN
}

d_clusterState_p->domainStates()
.at(info.uri().qualifiedDomain())
->adjustQueueCount(1);

// Create the queueContext.
queueContext.reset(new (*d_allocator_p)
QueueContext(info.uri(), d_allocator_p),
d_allocator_p);

d_queues[info.uri()] = queueContext;
}
mqbc::ClusterState::DomainState& domainState =
*d_clusterState_p->domainStates().at(info.uri().qualifiedDomain());

domainState.adjustQueueCount(1);

queueContext->d_stateQInfo_sp = d_clusterState_p->domainStates()
.at(info.uri().qualifiedDomain())
->queuesInfo()
.at(info.uri());
queueContext->d_stateQInfo_sp = domainState.queuesInfo().at(info.uri());
// Queue assignment from the leader is honored per the info updated
// above

Expand All @@ -4186,31 +4175,33 @@ void ClusterQueueHelper::onQueueAssigned(
// Note: In non-CSL mode, the queue creation callback is instead invoked at
// replica nodes when they receive a queue creation record from the primary
// in the partition stream.
if (!d_clusterState_p->isSelfPrimary(info.partitionId())) {
// This is a replica node

// Note: It's possible that the queue has already been registered in
// the StorageMgr if it was a queue found during storage recovery.
// Therefore, we will allow for duplicate registration which will
// simply result in a no-op.
d_storageManager_p->registerQueueReplica(
info.partitionId(),
info.uri(),
info.key(),
d_clusterState_p->domainStates()
.at(info.uri().qualifiedDomain())
->domain(),
true); // allowDuplicate

d_storageManager_p->updateQueueReplica(
info.partitionId(),
info.uri(),
info.key(),
info.appInfos(),
d_clusterState_p->domainStates()
.at(info.uri().qualifiedDomain())
->domain(),
true); // allowDuplicate
if (d_cluster_p->isCSLModeEnabled()) {
if (!d_clusterState_p->isSelfPrimary(info.partitionId())) {
// This is a replica node

// Note: It's possible that the queue has already been registered
// in the StorageMgr if it was a queue found during storage
// recovery. Therefore, we will allow for duplicate registration
// which will simply result in a no-op.
d_storageManager_p->registerQueueReplica(
info.partitionId(),
info.uri(),
info.key(),
d_clusterState_p->domainStates()
.at(info.uri().qualifiedDomain())
->domain(),
true); // allowDuplicate

d_storageManager_p->updateQueueReplica(
info.partitionId(),
info.uri(),
info.key(),
info.appInfos(),
d_clusterState_p->domainStates()
.at(info.uri().qualifiedDomain())
->domain(),
true); // allowDuplicate
}
}

// NOTE: Even if it is not needed to invoke 'onQueueContextAssigned' in the
Expand Down Expand Up @@ -4330,7 +4321,9 @@ void ClusterQueueHelper::onQueueUnassigned(
-1);
}
d_queuesById.erase(qinfo.d_id);
qinfo.reset();
qinfo.resetAndKeepPending();
// CQH will recreate 'queueContextSp->d_liveQInfo.d_queue_sp' upon
// 'onOpenQueueResponse'

// We do this in CSL mode only, such that isQueueAssigned() will
// return false.
Expand Down Expand Up @@ -4514,11 +4507,11 @@ ClusterQueueHelper::ClusterQueueHelper(
// response processed first for the closeQueue)

if (d_clusterStateManager_p) {
d_clusterStateManager_p->setQueueAssigningCb(bdlf::BindUtil::bind(
&ClusterQueueHelper::onQueueAssigning,
this,
bdlf::PlaceHolders::_1, // uri
bdlf::PlaceHolders::_2)); // processingPendingRequests
// d_clusterStateManager_p->setQueueAssigningCb(bdlf::BindUtil::bind(
// &ClusterQueueHelper::onQueueAssigning,
// this,
// bdlf::PlaceHolders::_1, // uri
// bdlf::PlaceHolders::_2)); // processingPendingRequests
d_clusterStateManager_p->setQueueUnassigningCb(bdlf::BindUtil::bind(
&ClusterQueueHelper::onQueueUnassigning,
this,
Expand Down Expand Up @@ -4619,43 +4612,6 @@ void ClusterQueueHelper::openQueue(

QueueContextMapIter queueContextIt = d_queues.find(uriKey);

// NOTE: See TBD in 'onGetDomainDispatched': if the request comes from a
// peer inside the cluster, 'clientIdentity' will represent our own
// identity instead of that of the peer; which is obviously wrong;
// however, here we only want to use it to determine whether the
// request comes from a peer node in the cluster (and not a client or
// a proxy broker), and so this is still fine.
if (clientContext->identity().clientType() ==
bmqp_ctrlmsg::ClientType::E_TCPBROKER &&
!clientContext->identity().clusterName().empty() &&
clientContext->identity().clusterNodeId() !=
mqbnet::Cluster::k_INVALID_NODE_ID) {
// The request came from a peer in the cluster, make sure we are the
// primary for the partition. Since we received the openQueue request
// from a in-cluster peer node, we should have already received a queue
// advisory assignment from the leader about that queue; however maybe
// events will come out of order, so just return a NOT_PRIMARY
// retryable error in this case and let the peer re-emit a request.
bsl::string reason;
mqbi::ClusterErrorCode::Enum errorCode =
mqbi::ClusterErrorCode::e_UNKNOWN;
if (queueContextIt == d_queues.end()) {
reason = "Not aware of that queue";
errorCode = mqbi::ClusterErrorCode::e_UNKNOWN_QUEUE;
CALLBACK_FAILURE(reason, errorCode);
return; // RETURN
}
const int pid = queueContextIt->second->partitionId();
if (!isSelfAvailablePrimary(pid)) {
bmqu::MemOutStream errorDesc;
errorDesc << "Not the primary for partitionId [" << pid << "]";
reason = errorDesc.str();
errorCode = mqbi::ClusterErrorCode::e_NOT_PRIMARY;
CALLBACK_FAILURE(reason, errorCode);
return; // RETURN
}
}

// Create an OpenQueue context for that request.
OpenQueueContext context;
context.d_domain_p = domain;
Expand Down Expand Up @@ -4752,6 +4708,9 @@ void ClusterQueueHelper::openQueue(
if (!isQueueAssigned(*(queueContextIt->second))) {
assignQueue(queueContextIt->second);
}
else {
cancelUnassignmentIfNeeded(*queueContextIt->second);
}
}
}
else {
Expand Down Expand Up @@ -6396,6 +6355,15 @@ int ClusterQueueHelper::gcExpiredQueues(bool immediate)
return rc_SUCCESS; // RETURN
}

void ClusterQueueHelper::cancelUnassignmentIfNeeded(QueueContext& queueContext)
{
if (queueContext.d_stateQInfo_sp) {
if (queueContext.d_stateQInfo_sp->pendingUnassignment()) {
// TODO
}
}
}

void ClusterQueueHelper::loadQueuesInfo(mqbcmd::StorageContent* out) const
{
// executed by the cluster *DISPATCHER* thread
Expand Down
34 changes: 17 additions & 17 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver,
/// Reset the `id`, `partitionId`, `key` and `queue` members of this
/// object. Note that `uri` is left untouched because it is an
/// invariant member of a given instance of such a QueueInfo object.
void reset();
void resetAndKeepPending();
};

struct StopContext {
Expand Down Expand Up @@ -522,14 +522,15 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver,
QueueAssignmentResult::Enum
assignQueue(const QueueContextSp& queueContext);

/// Called when the specified `uri` is in the process of being assigned.
/// If the specified `processingPendingRequests` is true, we will
/// process pending requests on this machine.
///
/// THREAD: This method is invoked in the associated cluster's
/// dispatcher thread.
void onQueueAssigning(const bmqt::Uri& uri,
bool processingPendingRequests);
// /// Called when the specified `uri` is in the process of being
// assigned.
// /// If the specified `processingPendingRequests` is true, we will
// /// process pending requests on this machine.
// ///
// /// THREAD: This method is invoked in the associated cluster's
// /// dispatcher thread.
// void onQueueAssigning(const bmqt::Uri& uri,
// bool processingPendingRequests);

/// Called when the queue with the specified `queueInfo` is being
/// unassigned. Load into the specified `hasInFlightRequests` whether
Expand Down Expand Up @@ -1108,13 +1109,6 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver,
/// failure.
int gcExpiredQueues(bool immediate = false);

ClusterQueueHelper& setOnQueueAssignedCb(const OnQueueAssignedCb& value);

/// Set the corresponding member to the specified `value` and return a
/// reference offering modifiable access to this object.
ClusterQueueHelper&
setOnQueueUnassignedCb(const OnQueueUnassignedCb& value);

/// Start executing multi-step processing of StopRequest or CLOSING node
/// advisory received from the specified `clusterNode`. In the case of
/// StopRequest the specified `request` references the request; in the
Expand All @@ -1134,8 +1128,14 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver,
mqbc::ClusterNodeSession* ns,
const VoidFunctor& callback = VoidFunctor());

/// Called upon leader becoming available.
void onLeaderAvailable();
// Called upon leader becoming available.

/// If the specified 'queueContext' is waiting for QueueUnassignment CSL
/// commit, cancel it.
/// This is to prevent 'onQueueUnassigned' destroying results of openQueue
/// processed while waiting for QueueUnassignment.
void cancelUnassignmentIfNeeded(QueueContext& queueContext);

// ACCESSORS

Expand Down
Loading

0 comments on commit 44300ab

Please sign in to comment.