Skip to content

Commit

Permalink
Merge pull request #512 from thecodefactory/puller
Browse files Browse the repository at this point in the history
Enable consistent tx/block publishing
  • Loading branch information
thecodefactory authored Jan 9, 2019
2 parents ad98c1f + 51fd6c3 commit 8a8a33c
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 16 deletions.
18 changes: 9 additions & 9 deletions src/services/block_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,19 @@ bool block_service::start()
void block_service::work()
{
zmq::socket xpub(authenticator_, role::extended_publisher, external_);
zmq::socket xsub(authenticator_, role::extended_subscriber, internal_);
zmq::socket puller(authenticator_, role::puller, internal_);

// Bind sockets to the service and worker endpoints.
if (!started(bind(xpub, xsub)))
if (!started(bind(xpub, puller)))
return;

// TODO: tap in to failure conditions, such as high water.
// BUGBUG: stop is insufficient to stop the worker, because of relay().
// Relay messages between subscriber and publisher (blocks on context).
relay(xpub, xsub);
relay(xpub, puller);

// Unbind the sockets and exit this thread.
finished(unbind(xpub, xsub));
finished(unbind(xpub, puller));
}

// Bind/Unbind.
Expand Down Expand Up @@ -177,11 +177,11 @@ void block_service::publish_blocks(uint32_t fork_height,
if (stopped())
return;

zmq::socket publisher(authenticator_, role::publisher, internal_);
zmq::socket pusher(authenticator_, role::pusher, internal_);

// Subscriptions are off the pub-sub thread so this must connect back.
// This could be optimized by caching the socket as thread static.
const auto ec = publisher.connect(worker_);
const auto ec = pusher.connect(worker_);

if (ec == error::service_stopped)
return;
Expand All @@ -195,14 +195,14 @@ void block_service::publish_blocks(uint32_t fork_height,
}

for (const auto block: *blocks)
publish_block(publisher, ++fork_height, block);
publish_block(pusher, ++fork_height, block);
}

// [ height:4 ]
// [ block ]
// The payload for block publication is delimited within the zeromq message.
// This is required for compatability and inconsistent with query payloads.
void block_service::publish_block(zmq::socket& publisher, size_t height,
void block_service::publish_block(zmq::socket& pusher, size_t height,
block_const_ptr block)
{
if (stopped())
Expand All @@ -217,7 +217,7 @@ void block_service::publish_block(zmq::socket& publisher, size_t height,
broadcast.enqueue(block->to_data(
system::message::version::level::canonical));

const auto ec = publisher.send(broadcast);
const auto ec = pusher.send(broadcast);

if (ec == error::service_stopped)
return;
Expand Down
14 changes: 7 additions & 7 deletions src/services/transaction_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,19 @@ bool transaction_service::start()
void transaction_service::work()
{
zmq::socket xpub(authenticator_, role::extended_publisher, external_);
zmq::socket xsub(authenticator_, role::extended_subscriber, internal_);
zmq::socket puller(authenticator_, role::puller, internal_);

// Bind sockets to the service and worker endpoints.
if (!started(bind(xpub, xsub)))
if (!started(bind(xpub, puller)))
return;

// TODO: tap in to failure conditions, such as high water.
// BUGBUG: stop is insufficient to stop the worker, because of relay().
// Relay messages between subscriber and publisher (blocks on context).
relay(xpub, xsub);
relay(xpub, puller);

// Unbind the sockets and exit this thread.
finished(unbind(xpub, xsub));
finished(unbind(xpub, puller));
}

// Bind/Unbind.
Expand Down Expand Up @@ -175,11 +175,11 @@ void transaction_service::publish_transaction(transaction_const_ptr tx)
if (stopped())
return;

zmq::socket publisher(authenticator_, role::publisher, internal_);
zmq::socket pusher(authenticator_, role::pusher, internal_);

// Subscriptions are off the pub-sub thread so this must connect back.
// This could be optimized by caching the socket as thread static.
auto ec = publisher.connect(worker_);
auto ec = pusher.connect(worker_);

if (ec == error::service_stopped)
return;
Expand All @@ -201,7 +201,7 @@ void transaction_service::publish_transaction(transaction_const_ptr tx)
broadcast.enqueue_little_endian(++sequence_);
broadcast.enqueue(tx->to_data(system::message::version::level::canonical));

ec = publisher.send(broadcast);
ec = pusher.send(broadcast);

if (ec == error::service_stopped)
return;
Expand Down

0 comments on commit 8a8a33c

Please sign in to comment.