diff --git a/.clang-format b/.clang-format index 37743a5..5b16361 100644 --- a/.clang-format +++ b/.clang-format @@ -2,3 +2,4 @@ BasedOnStyle: LLVM IndentWidth: 4 FixNamespaceComments: false +ColumnLimit: 100000 \ No newline at end of file diff --git a/.drone.yml b/.drone.yml index 3e29fca..ce1a964 100644 --- a/.drone.yml +++ b/.drone.yml @@ -6,5 +6,5 @@ steps: - name: build image: hub.codefirst.iut.uca.fr/clement.freville2/planificador-build-image:latest commands: - - mkdir build && cd build && cmake .. - - make + - cmake -B build -S . + - cmake --build build --parallel $(nproc) diff --git a/.gitignore b/.gitignore index 3b1c59a..6a848fb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,7 @@ bin build obj + +.idea +.vscode +cmake-build-debug diff --git a/CMakeLists.txt b/CMakeLists.txt index a99602d..9ae1f0e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,19 +1,51 @@ -cmake_minimum_required(VERSION 3.10) +cmake_minimum_required(VERSION 3.14) +set(CMAKE_POLICY_DEFAULT_CMP0077 NEW) project(planificador) -add_compile_options(-Wall -Wextra -pedantic) - include(FetchContent) + FetchContent_Declare( - tomlplusplus - GIT_REPOSITORY https://github.com/marzer/tomlplusplus.git - GIT_TAG v3.3.0 + tomlplusplus + GIT_REPOSITORY https://github.com/marzer/tomlplusplus.git + GIT_TAG v3.3.0 ) FetchContent_MakeAvailable(tomlplusplus) -file(GLOB nameFile "src/*.cpp") -add_executable(planificador src/host.cpp src/runner.cpp src/config.cpp src/main.cpp ) +list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_LIST_DIR}/libzmq-pkg-config) +find_package(ZeroMQ QUIET) + +add_executable(planificador src/host.cpp src/runner.cpp src/network.cpp src/config.cpp src/main.cpp) +if (NOT ZeroMQ_FOUND) + message(STATUS "ZeroMQ not found, using bundled version") + FetchContent_Declare( + zmq + GIT_REPOSITORY https://github.com/zeromq/libzmq.git + GIT_TAG v4.3.5 + ) + FetchContent_Declare( + cppzmq + GIT_REPOSITORY https://github.com/zeromq/cppzmq.git + GIT_TAG v4.10.0 + ) + set(WITH_PERF_TOOL OFF) + set(ENABLE_CPACK OFF) + set(ZMQ_BUILD_TESTS OFF) + set(WITH_TLS OFF) + set(WITH_DOC OFF) + set(BUILD_STATIC OFF) + set(BUILD_SHARED ON) + set(ZMQ_BUILD_TESTS OFF) + set(CPPZMQ_BUILD_TESTS OFF) + FetchContent_MakeAvailable(zmq cppzmq) + include_directories(${zmq_SOURCE_DIR}/include) + include_directories(${cppzmq_SOURCE_DIR}) + target_link_libraries(planificador PRIVATE libzmq) +else () + target_link_libraries(planificador PRIVATE zmq) +endif () + +target_compile_options(planificador PUBLIC -Wall -Wextra -pedantic) target_compile_features(planificador PUBLIC cxx_std_17) include_directories(${tomlplusplus_SOURCE_DIR}/include) diff --git a/libzmq-pkg-config/FindZeroMQ.cmake b/libzmq-pkg-config/FindZeroMQ.cmake new file mode 100644 index 0000000..892762c --- /dev/null +++ b/libzmq-pkg-config/FindZeroMQ.cmake @@ -0,0 +1,28 @@ +# Adapted from https://github.com/zeromq/cppzmq/blob/master/CMakeLists.txt +set(PKG_CONFIG_USE_CMAKE_PREFIX_PATH ON) +find_package(PkgConfig) +pkg_check_modules(PC_LIBZMQ QUIET libzmq) + +set(ZeroMQ_VERSION ${PC_LIBZMQ_VERSION}) + +find_path(ZeroMQ_INCLUDE_DIR zmq.h + PATHS ${ZeroMQ_DIR}/include + ${PC_LIBZMQ_INCLUDE_DIRS}) + +find_library(ZeroMQ_LIBRARY + NAMES zmq + PATHS ${ZeroMQ_DIR}/lib + ${PC_LIBZMQ_LIBDIR} + ${PC_LIBZMQ_LIBRARY_DIRS}) + +if(ZeroMQ_LIBRARY) + set(ZeroMQ_FOUND ON) +endif() + +set ( ZeroMQ_LIBRARIES ${ZeroMQ_LIBRARY} ) +set ( ZeroMQ_INCLUDE_DIRS ${ZeroMQ_INCLUDE_DIR} ) + +include ( FindPackageHandleStandardArgs ) +# handle the QUIETLY and REQUIRED arguments and set ZMQ_FOUND to TRUE +# if all listed variables are TRUE +find_package_handle_standard_args ( ZeroMQ DEFAULT_MSG ZeroMQ_LIBRARIES ZeroMQ_INCLUDE_DIRS ) diff --git a/src/config.cpp b/src/config.cpp index 4b1909e..7a1c7a3 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -17,11 +17,9 @@ std::vector sk::config::loadHostsFromToml(const fs::path &confFile) { } toml::table &serverInfo = *serverInfoPtr; - if (serverInfo.get_as("ip") && - serverInfo.get_as("nbContainerMax")) { + if (serverInfo.get_as("ip") && serverInfo.get_as("nbContainerMax")) { std::string serverIp = serverInfo.get_as("ip")->get(); - int serverMaxContainers = - serverInfo.get_as("nbContainerMax")->get(); + int serverMaxContainers = serverInfo.get_as("nbContainerMax")->get(); hosts.push_back(sk::host(serverIp, serverMaxContainers)); } diff --git a/src/host.cpp b/src/host.cpp index 479281f..6b32fb8 100644 --- a/src/host.cpp +++ b/src/host.cpp @@ -1,23 +1,16 @@ #include "host.hpp" namespace sk { -host::host(const std::string &ip, unsigned int connectionsMax) - : ip{ip}, connections{0}, connectionsMax{connectionsMax}, runners{} {} +host::host(const std::string &ip, unsigned int connectionsMax) : ip{ip}, connections{0}, connectionsMax{connectionsMax}, runners{} {} -void host::addConnection(sk::runner& runner) { +void host::addConnection(sk::runner &runner) { runners.push(runner); connections += 1; } -const std::string& host::getIp() const { - return ip; -} +const std::string &host::getIp() const { return ip; } -unsigned int host::getNbConnections() const { - return connections; -} +unsigned int host::getNbConnections() const { return connections; } -unsigned int host::getNbConnectionsMax() const { - return connectionsMax; -} +unsigned int host::getNbConnectionsMax() const { return connectionsMax; } } diff --git a/src/host.hpp b/src/host.hpp index f5fe88f..a423692 100644 --- a/src/host.hpp +++ b/src/host.hpp @@ -1,8 +1,8 @@ #pragma once -#include +#include "runner.hpp" #include -#include "runner.hpp" +#include namespace sk { class host { @@ -15,16 +15,13 @@ class host { public: host(const std::string &ip, unsigned int connectionsMax); - void addConnection(sk::runner& runner); + void addConnection(sk::runner &runner); const std::string &getIp() const; unsigned int getNbConnections() const; unsigned int getNbConnectionsMax() const; - bool operator<(const host &other) const { - return (connectionsMax - connections) < - other.getNbConnectionsMax() - other.getNbConnections(); - } + bool operator<(const host &other) const { return (connectionsMax - connections) < other.getNbConnectionsMax() - other.getNbConnections(); } }; } diff --git a/src/main.cpp b/src/main.cpp index 7672bb8..7268c51 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,4 +1,5 @@ #include "config.hpp" +#include "network.hpp" #include "program.hpp" #include "runner.hpp" #include @@ -11,14 +12,15 @@ #include #include "host.hpp" +#include "zmq_addon.hpp" -namespace fs = std::filesystem; +static constexpr uint32_t JOB_ID_LEN = 32; +static constexpr uint32_t MIN_MESSAGE_LEN = JOB_ID_LEN + sizeof(uint32_t) * 2; sk::runner_backend detect_backend() { const char *const argv[] = {"docker", "stats", "--no-stream", nullptr}; pid_t pid; - if (posix_spawnp(&pid, argv[0], nullptr, nullptr, - const_cast(argv), nullptr) != 0) { + if (posix_spawnp(&pid, argv[0], nullptr, nullptr, const_cast(argv), nullptr) != 0) { return sk::runner_backend::BubbleWrap; } int status = 0; @@ -35,20 +37,51 @@ int main() { std::vector listsHosts = sk::config::loadHostsFromToml("../conf.toml"); std::priority_queue hosts(listsHosts.begin(), listsHosts.end()); + std::queue programQueue; + if (hosts.empty()) { std::cerr << "Pas de host" << std::endl; + exit(1); } - std::queue queue; - queue.push(sk::program{"echo $(( 1 + 2 ))", "ghcr.io/moshell-lang/moshell:master"}); - sk::runner runner(detect_backend()); - while (!queue.empty()) { - const sk::program ¤t = queue.front(); - sk::run_result res = runner.run_blocking(current); - std::cout << "out: " << res.out << "\n"; - std::cout << "err: " << res.err << "\n"; - queue.pop(); + + zmq::context_t context(1); + zmq::socket_t receiver(context, zmq::socket_type::pull); + receiver.connect("tcp://localhost:5557"); + zmq::socket_t sender(context, zmq::socket_type::push); + sender.connect("tcp://localhost:5558"); + + while (true) { + zmq::message_t request; + receiver.recv(request); + if (request.size() < MIN_MESSAGE_LEN) { + std::cerr << "Invalid request" << std::endl; + break; + } + + std::string_view jobId(static_cast(request.data()), JOB_ID_LEN); + uint32_t imageLen = sk::read_uint32(static_cast(request.data()) + JOB_ID_LEN); + uint32_t codeLen = sk::read_uint32(static_cast(request.data()) + JOB_ID_LEN + sizeof(uint32_t)); + + if (request.size() < MIN_MESSAGE_LEN + imageLen + codeLen) { + std::cerr << "Invalid request" << std::endl; + break; + } + std::string imageString(static_cast(request.data()) + MIN_MESSAGE_LEN, imageLen); + std::string requestString(static_cast(request.data()) + MIN_MESSAGE_LEN + imageLen, codeLen); + + std::cout << "Executing " << codeLen << " bytes code.\n"; + sk::program program{requestString, imageString}; + sk::run_result result = runner.run_blocking(program); + + std::cout << "Result: " << result.out << std::endl; + + // Send the job id and result.out to sink + zmq::message_t reply(JOB_ID_LEN + result.out.size()); + memcpy(reply.data(), jobId.data(), JOB_ID_LEN); + memcpy(static_cast(reply.data()) + JOB_ID_LEN, result.out.data(), result.out.size()); + sender.send(reply, zmq::send_flags::none); } return 0; } diff --git a/src/network.cpp b/src/network.cpp new file mode 100644 index 0000000..d5d3a0c --- /dev/null +++ b/src/network.cpp @@ -0,0 +1,5 @@ +#include "network.hpp" + +namespace sk { +uint32_t read_uint32(const char *buffer) { return static_cast(buffer[3]) | static_cast(buffer[2]) << 8 | static_cast(buffer[1]) << 16 | static_cast(buffer[0]) << 24; } +} diff --git a/src/network.hpp b/src/network.hpp new file mode 100644 index 0000000..db7ab6c --- /dev/null +++ b/src/network.hpp @@ -0,0 +1,7 @@ +#pragma once + +#include + +namespace sk { +uint32_t read_uint32(const char *buffer); +} diff --git a/src/runner.cpp b/src/runner.cpp index a3ad56d..f473c0c 100644 --- a/src/runner.cpp +++ b/src/runner.cpp @@ -30,33 +30,15 @@ run_result runner::run_blocking(const program &program) { posix_spawn_file_actions_addclose(&actions, in_pipe[0]); posix_spawn_file_actions_addclose(&actions, out_pipe[1]); posix_spawn_file_actions_addclose(&actions, err_pipe[1]); - const char *const docker_args[] = {"docker", - "run", - "--rm", - "-i", - "--pull=never", - "--cap-drop=ALL", - "--network=none", - "--memory=64m", - "--memory-swap=64m", - "--pids-limit=128", - program.image.c_str(), - nullptr}; - const char *const bwrap_args[] = { - "bwrap", "--ro-bind", "/usr", "/usr", "--dir", - "/tmp", "--dir", "/var", "--proc", "/proc", - "--dev", "/dev", "--symlink", "usr/lib", "/lib", - "--symlink", "usr/lib64", "/lib64", "--symlink", "usr/bin", - "/bin", "--symlink", "usr/sbin", "/sbin", "--unshare-all", - "/bin/sh", nullptr}; + const char *const docker_args[] = {"docker", "run", "--rm", "-i", "--pull=never", "--cap-drop=ALL", "--network=none", "--memory=64m", "--memory-swap=64m", "--pids-limit=128", program.image.c_str(), nullptr}; + const char *const bwrap_args[] = {"bwrap", "--ro-bind", "/usr", "/usr", "--dir", "/tmp", "--dir", "/var", "--proc", "/proc", "--dev", "/dev", "--symlink", "usr/lib", "/lib", "--symlink", "usr/lib64", "/lib64", "--symlink", "usr/bin", "/bin", "--symlink", "usr/sbin", "/sbin", "--unshare-all", "/bin/sh", nullptr}; const char *const *args = docker_args; if (backend == runner_backend::BubbleWrap) { args = bwrap_args; } pid_t pid; int exit_code; - if (posix_spawnp(&pid, args[0], &actions, nullptr, - const_cast(args), nullptr) != 0) { + if (posix_spawnp(&pid, args[0], &actions, nullptr, const_cast(args), nullptr) != 0) { throw std::system_error{errno, std::generic_category()}; } @@ -70,19 +52,16 @@ run_result runner::run_blocking(const program &program) { std::array buffer{}; std::string out; std::string err; - std::array plist = {pollfd{out_pipe[0], POLLIN, 0}, - pollfd{err_pipe[0], POLLIN, 0}}; + std::array plist = {pollfd{out_pipe[0], POLLIN, 0}, pollfd{err_pipe[0], POLLIN, 0}}; while (poll(plist.data(), plist.size(), /*timeout*/ -1) > 0) { if (plist[0].revents & POLLIN) { - ssize_t bytes_read = - read(out_pipe[0], buffer.data(), buffer.size()); + ssize_t bytes_read = read(out_pipe[0], buffer.data(), buffer.size()); if (bytes_read == -1) { throw std::system_error{errno, std::generic_category()}; } out.append(buffer.data(), bytes_read); } else if (plist[1].revents & POLLIN) { - ssize_t bytes_read = - read(err_pipe[0], buffer.data(), buffer.size()); + ssize_t bytes_read = read(err_pipe[0], buffer.data(), buffer.size()); if (bytes_read == -1) { throw std::system_error{errno, std::generic_category()}; }