From d12b9fecc485e947c0fa9a70f0f46d1e6d8cdc5a Mon Sep 17 00:00:00 2001 From: clfreville2 Date: Mon, 16 Oct 2023 14:04:31 +0200 Subject: [PATCH] Prepare ZeroMQ --- CMakeLists.txt | 13 +++++++++++-- src/main.cpp | 49 +++++++++++++++++++++++++++++++++++++------------ 2 files changed, 48 insertions(+), 14 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index a99602d..899a1d3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,6 +5,14 @@ project(planificador) add_compile_options(-Wall -Wextra -pedantic) include(FetchContent) + +FetchContent_Declare( + cppzmq + GIT_REPOSITORY https://github.com/zeromq/cppzmq.git + GIT_TAG v4.10.0 +) +FetchContent_MakeAvailable(cppzmq) + FetchContent_Declare( tomlplusplus GIT_REPOSITORY https://github.com/marzer/tomlplusplus.git @@ -12,8 +20,9 @@ FetchContent_Declare( ) FetchContent_MakeAvailable(tomlplusplus) -file(GLOB nameFile "src/*.cpp") -add_executable(planificador src/host.cpp src/runner.cpp src/config.cpp src/main.cpp ) +add_executable(planificador src/host.cpp src/runner.cpp src/config.cpp src/main.cpp) target_compile_features(planificador PUBLIC cxx_std_17) include_directories(${tomlplusplus_SOURCE_DIR}/include) +include_directories(${cppzmq_SOURCE_DIR}) +target_link_libraries(planificador PRIVATE zmq) diff --git a/src/main.cpp b/src/main.cpp index 7672bb8..9737048 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -11,8 +11,7 @@ #include #include "host.hpp" - -namespace fs = std::filesystem; +#include "zmq_addon.hpp" sk::runner_backend detect_backend() { const char *const argv[] = {"docker", "stats", "--no-stream", nullptr}; @@ -32,23 +31,49 @@ sk::runner_backend detect_backend() { } int main() { - std::vector listsHosts = sk::config::loadHostsFromToml("../conf.toml"); + std::vector listsHosts = + sk::config::loadHostsFromToml("../conf.toml"); std::priority_queue hosts(listsHosts.begin(), listsHosts.end()); + std::queue programQueue; + if (hosts.empty()) { std::cerr << "Pas de host" << std::endl; + exit(1); } - std::queue queue; - queue.push(sk::program{"echo $(( 1 + 2 ))", "ghcr.io/moshell-lang/moshell:master"}); - sk::runner runner(detect_backend()); - while (!queue.empty()) { - const sk::program ¤t = queue.front(); - sk::run_result res = runner.run_blocking(current); - std::cout << "out: " << res.out << "\n"; - std::cout << "err: " << res.err << "\n"; - queue.pop(); + + zmq::context_t context(1); + zmq::socket_t receiver(context, zmq::socket_type::pull); + receiver.connect("tcp://localhost:5557"); + zmq::socket_t sender(context, zmq::socket_type::push); + sender.connect("tcp://localhost:5558"); + + while (true) { + zmq::message_t request; + receiver.recv(request); + if (request.size() < 32) { + std::cerr << "Invalid request" << std::endl; + break; + } + std::string_view jobId(static_cast(request.data()), 32); + std::string requestString(static_cast(request.data()) + 32, + request.size() - 32); + + std::cout << "Executing " << request.size() << " bytes code.\n"; + sk::program program{requestString, + "ghcr.io/moshell-lang/moshell:master"}; + sk::run_result result = runner.run_blocking(program); + + std::cout << "Result: " << result.out << std::endl; + + // Send the job id and result.out to sink + zmq::message_t reply(32 + result.out.size()); + memcpy(reply.data(), jobId.data(), 32); + memcpy(static_cast(reply.data()) + 32, result.out.data(), + result.out.size()); + sender.send(reply, zmq::send_flags::none); } return 0; }