From 51fd6c3ad39bdf3c84a515884a7a49f80161235a Mon Sep 17 00:00:00 2001 From: Neill Miller Date: Tue, 8 Jan 2019 20:55:37 -0500 Subject: [PATCH] Enable consistent tx/block publishing by replacing internal pub-sub model for push pull. --- src/services/block_service.cpp | 18 +++++++++--------- src/services/transaction_service.cpp | 14 +++++++------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/services/block_service.cpp b/src/services/block_service.cpp index 702fc434..e1e9c167 100644 --- a/src/services/block_service.cpp +++ b/src/services/block_service.cpp @@ -74,19 +74,19 @@ bool block_service::start() void block_service::work() { zmq::socket xpub(authenticator_, role::extended_publisher, external_); - zmq::socket xsub(authenticator_, role::extended_subscriber, internal_); + zmq::socket puller(authenticator_, role::puller, internal_); // Bind sockets to the service and worker endpoints. - if (!started(bind(xpub, xsub))) + if (!started(bind(xpub, puller))) return; // TODO: tap in to failure conditions, such as high water. // BUGBUG: stop is insufficient to stop the worker, because of relay(). // Relay messages between subscriber and publisher (blocks on context). - relay(xpub, xsub); + relay(xpub, puller); // Unbind the sockets and exit this thread. - finished(unbind(xpub, xsub)); + finished(unbind(xpub, puller)); } // Bind/Unbind. @@ -177,11 +177,11 @@ void block_service::publish_blocks(uint32_t fork_height, if (stopped()) return; - zmq::socket publisher(authenticator_, role::publisher, internal_); + zmq::socket pusher(authenticator_, role::pusher, internal_); // Subscriptions are off the pub-sub thread so this must connect back. // This could be optimized by caching the socket as thread static. - const auto ec = publisher.connect(worker_); + const auto ec = pusher.connect(worker_); if (ec == error::service_stopped) return; @@ -195,14 +195,14 @@ void block_service::publish_blocks(uint32_t fork_height, } for (const auto block: *blocks) - publish_block(publisher, ++fork_height, block); + publish_block(pusher, ++fork_height, block); } // [ height:4 ] // [ block ] // The payload for block publication is delimited within the zeromq message. // This is required for compatability and inconsistent with query payloads. -void block_service::publish_block(zmq::socket& publisher, size_t height, +void block_service::publish_block(zmq::socket& pusher, size_t height, block_const_ptr block) { if (stopped()) @@ -217,7 +217,7 @@ void block_service::publish_block(zmq::socket& publisher, size_t height, broadcast.enqueue(block->to_data( system::message::version::level::canonical)); - const auto ec = publisher.send(broadcast); + const auto ec = pusher.send(broadcast); if (ec == error::service_stopped) return; diff --git a/src/services/transaction_service.cpp b/src/services/transaction_service.cpp index e40f06f2..959018e8 100644 --- a/src/services/transaction_service.cpp +++ b/src/services/transaction_service.cpp @@ -73,19 +73,19 @@ bool transaction_service::start() void transaction_service::work() { zmq::socket xpub(authenticator_, role::extended_publisher, external_); - zmq::socket xsub(authenticator_, role::extended_subscriber, internal_); + zmq::socket puller(authenticator_, role::puller, internal_); // Bind sockets to the service and worker endpoints. - if (!started(bind(xpub, xsub))) + if (!started(bind(xpub, puller))) return; // TODO: tap in to failure conditions, such as high water. // BUGBUG: stop is insufficient to stop the worker, because of relay(). // Relay messages between subscriber and publisher (blocks on context). - relay(xpub, xsub); + relay(xpub, puller); // Unbind the sockets and exit this thread. - finished(unbind(xpub, xsub)); + finished(unbind(xpub, puller)); } // Bind/Unbind. @@ -175,11 +175,11 @@ void transaction_service::publish_transaction(transaction_const_ptr tx) if (stopped()) return; - zmq::socket publisher(authenticator_, role::publisher, internal_); + zmq::socket pusher(authenticator_, role::pusher, internal_); // Subscriptions are off the pub-sub thread so this must connect back. // This could be optimized by caching the socket as thread static. - auto ec = publisher.connect(worker_); + auto ec = pusher.connect(worker_); if (ec == error::service_stopped) return; @@ -201,7 +201,7 @@ void transaction_service::publish_transaction(transaction_const_ptr tx) broadcast.enqueue_little_endian(++sequence_); broadcast.enqueue(tx->to_data(system::message::version::level::canonical)); - ec = publisher.send(broadcast); + ec = pusher.send(broadcast); if (ec == error::service_stopped) return;