From eef0e1ab09cbfbc994368c2fdc3a345c3883ed6d Mon Sep 17 00:00:00 2001 From: jordanmao Date: Fri, 13 Dec 2024 21:47:50 -0800 Subject: [PATCH 1/2] Automatically spawn python child process from provider C++ code --- BUILD | 10 +++++++--- include/IPC/zmq_receiver.h | 1 + include/IPC/zmq_sender.h | 1 + sorting_python/sorting.py | 5 +++-- src/IPC/zmq_receiver.cpp | 4 ++++ src/IPC/zmq_sender.cpp | 6 +++++- src/Peers/provider.cpp | 29 +++++++++++++++++++++++++---- 7 files changed, 46 insertions(+), 10 deletions(-) diff --git a/BUILD b/BUILD index 6728ab42f..94ad9030c 100755 --- a/BUILD +++ b/BUILD @@ -62,9 +62,8 @@ cc_binary( name = "provider", srcs = ["main.cpp"], defines = ["PROVIDER=1"] + local_defines, - deps = [ - ":src_files", - ], + deps = [":src_files"], + data = ["sorting_python/sorting.py"] ) cc_binary( @@ -74,6 +73,11 @@ cc_binary( deps = [":src_files"], ) +py_binary( + name = "sorting", + srcs = ["sorting_python/sorting.py"], +) + ################################# tests BUILD target ################################# cc_test( diff --git a/include/IPC/zmq_receiver.h b/include/IPC/zmq_receiver.h index 70a1589e8..ca70173a2 100644 --- a/include/IPC/zmq_receiver.h +++ b/include/IPC/zmq_receiver.h @@ -13,6 +13,7 @@ class ZMQReceiver { public: ZMQReceiver(); std::string receive(); + unsigned int getPort() const; }; #endif // _ZMQ_RECEIVER_H_ diff --git a/include/IPC/zmq_sender.h b/include/IPC/zmq_sender.h index f9016c537..1aabbf53a 100644 --- a/include/IPC/zmq_sender.h +++ b/include/IPC/zmq_sender.h @@ -13,6 +13,7 @@ class ZMQSender { public: ZMQSender(); void send(const std::string& message); + unsigned int getPort() const; }; #endif // _ZMQ_SENDER_H_ diff --git a/sorting_python/sorting.py b/sorting_python/sorting.py index 8d11ee65a..90603cb9d 100755 --- a/sorting_python/sorting.py +++ b/sorting_python/sorting.py @@ -1,8 +1,9 @@ import zmq +import sys # Set up the context and responder socket -port_rec = int(input("Enter the ZMQ sender port number: ")) -port_send = int(input("Enter the ZMQ receiver port number: ")) +port_rec = int(sys.argv[1]) +port_send = int(sys.argv[2]) context = zmq.Context() responder = context.socket(zmq.REP) diff --git a/src/IPC/zmq_receiver.cpp b/src/IPC/zmq_receiver.cpp index 74c5bc03a..023d7275d 100644 --- a/src/IPC/zmq_receiver.cpp +++ b/src/IPC/zmq_receiver.cpp @@ -19,3 +19,7 @@ std::string ZMQReceiver::receive() { return zmq_msg.to_string(); } + +unsigned int ZMQReceiver::getPort() const { + return port; +} diff --git a/src/IPC/zmq_sender.cpp b/src/IPC/zmq_sender.cpp index 99bbf4357..38487ac9a 100644 --- a/src/IPC/zmq_sender.cpp +++ b/src/IPC/zmq_sender.cpp @@ -14,4 +14,8 @@ void ZMQSender::send(const std::string& message) { zmq::message_t reply; socket.recv(reply, zmq::recv_flags::none); -} \ No newline at end of file +} + +unsigned int ZMQSender::getPort() const { + return port; +} diff --git a/src/Peers/provider.cpp b/src/Peers/provider.cpp index 1a0d7414b..1800891e2 100755 --- a/src/Peers/provider.cpp +++ b/src/Peers/provider.cpp @@ -7,6 +7,7 @@ #include "../../include/utility.h" #include +#include #include #include #include @@ -78,10 +79,30 @@ void Provider::listen() { server->closeConn(); } - if (task->getLeaderUuid() == uuid) { - leaderHandleTaskRequest(requesterIpAddr); - } else { - followerHandleTaskRequest(); + try { + // Spin up python script child process + std::string script_path = "sorting_python/sorting.py"; + std::vector args{ + std::to_string(zmq_sender.getPort()), + std::to_string(zmq_receiver.getPort())}; + boost::process::child python_script("/usr/bin/env", "python3", script_path, args); + + // Process P2P communication + if (task->getLeaderUuid() == uuid) { + leaderHandleTaskRequest(requesterIpAddr); + } else { + followerHandleTaskRequest(); + } + + // Wait for python script to terminate + python_script.wait(); + + int exit_code = python_script.exit_code(); + if (exit_code != 0) { + std::cout << "Error: Python child process exited with code: " << exit_code << std::endl; + } + } catch (const std::exception& e) { + std::cerr << "Exception: " << e.what() << std::endl; } } } From 9ef9625a43cbc6d31d5028c1c43e06dfe48f7fc8 Mon Sep 17 00:00:00 2001 From: jordanmao Date: Fri, 13 Dec 2024 22:42:44 -0800 Subject: [PATCH 2/2] Fix default python interpreter and add option to use env variable --- src/Peers/provider.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Peers/provider.cpp b/src/Peers/provider.cpp index 1800891e2..2eaff9b78 100755 --- a/src/Peers/provider.cpp +++ b/src/Peers/provider.cpp @@ -81,11 +81,13 @@ void Provider::listen() { try { // Spin up python script child process + const char* python_env = std::getenv("PYTHON_INTERPRETER"); + std::string python_interpreter = python_env ? python_env : "/usr/bin/python3"; std::string script_path = "sorting_python/sorting.py"; std::vector args{ std::to_string(zmq_sender.getPort()), std::to_string(zmq_receiver.getPort())}; - boost::process::child python_script("/usr/bin/env", "python3", script_path, args); + boost::process::child python_script(python_interpreter, script_path, args); // Process P2P communication if (task->getLeaderUuid() == uuid) {