Merge pull request 'Roundtrip with ZeroMQ' (#1) from zmq into master
continuous-integration/drone/push Build is passing Details

Reviewed-on: #1
pull/2/head
Bastien OLLIER 2 years ago
commit 169323098d

@ -2,3 +2,4 @@
BasedOnStyle: LLVM BasedOnStyle: LLVM
IndentWidth: 4 IndentWidth: 4
FixNamespaceComments: false FixNamespaceComments: false
ColumnLimit: 100000

@ -6,5 +6,5 @@ steps:
- name: build - name: build
image: hub.codefirst.iut.uca.fr/clement.freville2/planificador-build-image:latest image: hub.codefirst.iut.uca.fr/clement.freville2/planificador-build-image:latest
commands: commands:
- mkdir build && cd build && cmake .. - cmake -B build -S .
- make - cmake --build build --parallel $(nproc)

4
.gitignore vendored

@ -1,3 +1,7 @@
bin bin
build build
obj obj
.idea
.vscode
cmake-build-debug

@ -1,19 +1,51 @@
cmake_minimum_required(VERSION 3.10) cmake_minimum_required(VERSION 3.14)
set(CMAKE_POLICY_DEFAULT_CMP0077 NEW)
project(planificador) project(planificador)
add_compile_options(-Wall -Wextra -pedantic)
include(FetchContent) include(FetchContent)
FetchContent_Declare( FetchContent_Declare(
tomlplusplus tomlplusplus
GIT_REPOSITORY https://github.com/marzer/tomlplusplus.git GIT_REPOSITORY https://github.com/marzer/tomlplusplus.git
GIT_TAG v3.3.0 GIT_TAG v3.3.0
) )
FetchContent_MakeAvailable(tomlplusplus) FetchContent_MakeAvailable(tomlplusplus)
file(GLOB nameFile "src/*.cpp") list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_LIST_DIR}/libzmq-pkg-config)
add_executable(planificador src/host.cpp src/runner.cpp src/config.cpp src/main.cpp ) find_package(ZeroMQ QUIET)
add_executable(planificador src/host.cpp src/runner.cpp src/network.cpp src/config.cpp src/main.cpp)
if (NOT ZeroMQ_FOUND)
message(STATUS "ZeroMQ not found, using bundled version")
FetchContent_Declare(
zmq
GIT_REPOSITORY https://github.com/zeromq/libzmq.git
GIT_TAG v4.3.5
)
FetchContent_Declare(
cppzmq
GIT_REPOSITORY https://github.com/zeromq/cppzmq.git
GIT_TAG v4.10.0
)
set(WITH_PERF_TOOL OFF)
set(ENABLE_CPACK OFF)
set(ZMQ_BUILD_TESTS OFF)
set(WITH_TLS OFF)
set(WITH_DOC OFF)
set(BUILD_STATIC OFF)
set(BUILD_SHARED ON)
set(ZMQ_BUILD_TESTS OFF)
set(CPPZMQ_BUILD_TESTS OFF)
FetchContent_MakeAvailable(zmq cppzmq)
include_directories(${zmq_SOURCE_DIR}/include)
include_directories(${cppzmq_SOURCE_DIR})
target_link_libraries(planificador PRIVATE libzmq)
else ()
target_link_libraries(planificador PRIVATE zmq)
endif ()
target_compile_options(planificador PUBLIC -Wall -Wextra -pedantic)
target_compile_features(planificador PUBLIC cxx_std_17) target_compile_features(planificador PUBLIC cxx_std_17)
include_directories(${tomlplusplus_SOURCE_DIR}/include) include_directories(${tomlplusplus_SOURCE_DIR}/include)

@ -0,0 +1,28 @@
# Adapted from https://github.com/zeromq/cppzmq/blob/master/CMakeLists.txt
set(PKG_CONFIG_USE_CMAKE_PREFIX_PATH ON)
find_package(PkgConfig)
pkg_check_modules(PC_LIBZMQ QUIET libzmq)
set(ZeroMQ_VERSION ${PC_LIBZMQ_VERSION})
find_path(ZeroMQ_INCLUDE_DIR zmq.h
PATHS ${ZeroMQ_DIR}/include
${PC_LIBZMQ_INCLUDE_DIRS})
find_library(ZeroMQ_LIBRARY
NAMES zmq
PATHS ${ZeroMQ_DIR}/lib
${PC_LIBZMQ_LIBDIR}
${PC_LIBZMQ_LIBRARY_DIRS})
if(ZeroMQ_LIBRARY)
set(ZeroMQ_FOUND ON)
endif()
set ( ZeroMQ_LIBRARIES ${ZeroMQ_LIBRARY} )
set ( ZeroMQ_INCLUDE_DIRS ${ZeroMQ_INCLUDE_DIR} )
include ( FindPackageHandleStandardArgs )
# handle the QUIETLY and REQUIRED arguments and set ZMQ_FOUND to TRUE
# if all listed variables are TRUE
find_package_handle_standard_args ( ZeroMQ DEFAULT_MSG ZeroMQ_LIBRARIES ZeroMQ_INCLUDE_DIRS )

@ -17,11 +17,9 @@ std::vector<sk::host> sk::config::loadHostsFromToml(const fs::path &confFile) {
} }
toml::table &serverInfo = *serverInfoPtr; toml::table &serverInfo = *serverInfoPtr;
if (serverInfo.get_as<std::string>("ip") && if (serverInfo.get_as<std::string>("ip") && serverInfo.get_as<int64_t>("nbContainerMax")) {
serverInfo.get_as<int64_t>("nbContainerMax")) {
std::string serverIp = serverInfo.get_as<std::string>("ip")->get(); std::string serverIp = serverInfo.get_as<std::string>("ip")->get();
int serverMaxContainers = int serverMaxContainers = serverInfo.get_as<int64_t>("nbContainerMax")->get();
serverInfo.get_as<int64_t>("nbContainerMax")->get();
hosts.push_back(sk::host(serverIp, serverMaxContainers)); hosts.push_back(sk::host(serverIp, serverMaxContainers));
} }

@ -1,23 +1,16 @@
#include "host.hpp" #include "host.hpp"
namespace sk { namespace sk {
host::host(const std::string &ip, unsigned int connectionsMax) host::host(const std::string &ip, unsigned int connectionsMax) : ip{ip}, connections{0}, connectionsMax{connectionsMax}, runners{} {}
: ip{ip}, connections{0}, connectionsMax{connectionsMax}, runners{} {}
void host::addConnection(sk::runner& runner) { void host::addConnection(sk::runner &runner) {
runners.push(runner); runners.push(runner);
connections += 1; connections += 1;
} }
const std::string& host::getIp() const { const std::string &host::getIp() const { return ip; }
return ip;
}
unsigned int host::getNbConnections() const { unsigned int host::getNbConnections() const { return connections; }
return connections;
}
unsigned int host::getNbConnectionsMax() const { unsigned int host::getNbConnectionsMax() const { return connectionsMax; }
return connectionsMax;
}
} }

@ -1,8 +1,8 @@
#pragma once #pragma once
#include <string> #include "runner.hpp"
#include <queue> #include <queue>
#include "runner.hpp" #include <string>
namespace sk { namespace sk {
class host { class host {
@ -15,16 +15,13 @@ class host {
public: public:
host(const std::string &ip, unsigned int connectionsMax); host(const std::string &ip, unsigned int connectionsMax);
void addConnection(sk::runner& runner); void addConnection(sk::runner &runner);
const std::string &getIp() const; const std::string &getIp() const;
unsigned int getNbConnections() const; unsigned int getNbConnections() const;
unsigned int getNbConnectionsMax() const; unsigned int getNbConnectionsMax() const;
bool operator<(const host &other) const { bool operator<(const host &other) const { return (connectionsMax - connections) < other.getNbConnectionsMax() - other.getNbConnections(); }
return (connectionsMax - connections) <
other.getNbConnectionsMax() - other.getNbConnections();
}
}; };
} }

@ -1,4 +1,5 @@
#include "config.hpp" #include "config.hpp"
#include "network.hpp"
#include "program.hpp" #include "program.hpp"
#include "runner.hpp" #include "runner.hpp"
#include <filesystem> #include <filesystem>
@ -11,14 +12,15 @@
#include <wait.h> #include <wait.h>
#include "host.hpp" #include "host.hpp"
#include "zmq_addon.hpp"
namespace fs = std::filesystem; static constexpr uint32_t JOB_ID_LEN = 32;
static constexpr uint32_t MIN_MESSAGE_LEN = JOB_ID_LEN + sizeof(uint32_t) * 2;
sk::runner_backend detect_backend() { sk::runner_backend detect_backend() {
const char *const argv[] = {"docker", "stats", "--no-stream", nullptr}; const char *const argv[] = {"docker", "stats", "--no-stream", nullptr};
pid_t pid; pid_t pid;
if (posix_spawnp(&pid, argv[0], nullptr, nullptr, if (posix_spawnp(&pid, argv[0], nullptr, nullptr, const_cast<char *const *>(argv), nullptr) != 0) {
const_cast<char *const *>(argv), nullptr) != 0) {
return sk::runner_backend::BubbleWrap; return sk::runner_backend::BubbleWrap;
} }
int status = 0; int status = 0;
@ -35,20 +37,51 @@ int main() {
std::vector<sk::host> listsHosts = sk::config::loadHostsFromToml("../conf.toml"); std::vector<sk::host> listsHosts = sk::config::loadHostsFromToml("../conf.toml");
std::priority_queue<sk::host> hosts(listsHosts.begin(), listsHosts.end()); std::priority_queue<sk::host> hosts(listsHosts.begin(), listsHosts.end());
std::queue<sk::program> programQueue;
if (hosts.empty()) { if (hosts.empty()) {
std::cerr << "Pas de host" << std::endl; std::cerr << "Pas de host" << std::endl;
exit(1);
} }
std::queue<sk::program> queue;
queue.push(sk::program{"echo $(( 1 + 2 ))", "ghcr.io/moshell-lang/moshell:master"});
sk::runner runner(detect_backend()); sk::runner runner(detect_backend());
while (!queue.empty()) {
const sk::program &current = queue.front(); zmq::context_t context(1);
sk::run_result res = runner.run_blocking(current); zmq::socket_t receiver(context, zmq::socket_type::pull);
std::cout << "out: " << res.out << "\n"; receiver.connect("tcp://localhost:5557");
std::cout << "err: " << res.err << "\n"; zmq::socket_t sender(context, zmq::socket_type::push);
queue.pop(); sender.connect("tcp://localhost:5558");
while (true) {
zmq::message_t request;
receiver.recv(request);
if (request.size() < MIN_MESSAGE_LEN) {
std::cerr << "Invalid request" << std::endl;
break;
}
std::string_view jobId(static_cast<char *>(request.data()), JOB_ID_LEN);
uint32_t imageLen = sk::read_uint32(static_cast<char *>(request.data()) + JOB_ID_LEN);
uint32_t codeLen = sk::read_uint32(static_cast<char *>(request.data()) + JOB_ID_LEN + sizeof(uint32_t));
if (request.size() < MIN_MESSAGE_LEN + imageLen + codeLen) {
std::cerr << "Invalid request" << std::endl;
break;
}
std::string imageString(static_cast<char *>(request.data()) + MIN_MESSAGE_LEN, imageLen);
std::string requestString(static_cast<char *>(request.data()) + MIN_MESSAGE_LEN + imageLen, codeLen);
std::cout << "Executing " << codeLen << " bytes code.\n";
sk::program program{requestString, imageString};
sk::run_result result = runner.run_blocking(program);
std::cout << "Result: " << result.out << std::endl;
// Send the job id and result.out to sink
zmq::message_t reply(JOB_ID_LEN + result.out.size());
memcpy(reply.data(), jobId.data(), JOB_ID_LEN);
memcpy(static_cast<char *>(reply.data()) + JOB_ID_LEN, result.out.data(), result.out.size());
sender.send(reply, zmq::send_flags::none);
} }
return 0; return 0;
} }

@ -0,0 +1,5 @@
#include "network.hpp"
namespace sk {
uint32_t read_uint32(const char *buffer) { return static_cast<uint32_t>(buffer[3]) | static_cast<uint32_t>(buffer[2]) << 8 | static_cast<uint32_t>(buffer[1]) << 16 | static_cast<uint32_t>(buffer[0]) << 24; }
}

@ -0,0 +1,7 @@
#pragma once
#include <cstdint>
namespace sk {
uint32_t read_uint32(const char *buffer);
}

@ -30,33 +30,15 @@ run_result runner::run_blocking(const program &program) {
posix_spawn_file_actions_addclose(&actions, in_pipe[0]); posix_spawn_file_actions_addclose(&actions, in_pipe[0]);
posix_spawn_file_actions_addclose(&actions, out_pipe[1]); posix_spawn_file_actions_addclose(&actions, out_pipe[1]);
posix_spawn_file_actions_addclose(&actions, err_pipe[1]); posix_spawn_file_actions_addclose(&actions, err_pipe[1]);
const char *const docker_args[] = {"docker", const char *const docker_args[] = {"docker", "run", "--rm", "-i", "--pull=never", "--cap-drop=ALL", "--network=none", "--memory=64m", "--memory-swap=64m", "--pids-limit=128", program.image.c_str(), nullptr};
"run", const char *const bwrap_args[] = {"bwrap", "--ro-bind", "/usr", "/usr", "--dir", "/tmp", "--dir", "/var", "--proc", "/proc", "--dev", "/dev", "--symlink", "usr/lib", "/lib", "--symlink", "usr/lib64", "/lib64", "--symlink", "usr/bin", "/bin", "--symlink", "usr/sbin", "/sbin", "--unshare-all", "/bin/sh", nullptr};
"--rm",
"-i",
"--pull=never",
"--cap-drop=ALL",
"--network=none",
"--memory=64m",
"--memory-swap=64m",
"--pids-limit=128",
program.image.c_str(),
nullptr};
const char *const bwrap_args[] = {
"bwrap", "--ro-bind", "/usr", "/usr", "--dir",
"/tmp", "--dir", "/var", "--proc", "/proc",
"--dev", "/dev", "--symlink", "usr/lib", "/lib",
"--symlink", "usr/lib64", "/lib64", "--symlink", "usr/bin",
"/bin", "--symlink", "usr/sbin", "/sbin", "--unshare-all",
"/bin/sh", nullptr};
const char *const *args = docker_args; const char *const *args = docker_args;
if (backend == runner_backend::BubbleWrap) { if (backend == runner_backend::BubbleWrap) {
args = bwrap_args; args = bwrap_args;
} }
pid_t pid; pid_t pid;
int exit_code; int exit_code;
if (posix_spawnp(&pid, args[0], &actions, nullptr, if (posix_spawnp(&pid, args[0], &actions, nullptr, const_cast<char *const *>(args), nullptr) != 0) {
const_cast<char *const *>(args), nullptr) != 0) {
throw std::system_error{errno, std::generic_category()}; throw std::system_error{errno, std::generic_category()};
} }
@ -70,19 +52,16 @@ run_result runner::run_blocking(const program &program) {
std::array<char, 1024> buffer{}; std::array<char, 1024> buffer{};
std::string out; std::string out;
std::string err; std::string err;
std::array<pollfd, 2> plist = {pollfd{out_pipe[0], POLLIN, 0}, std::array<pollfd, 2> plist = {pollfd{out_pipe[0], POLLIN, 0}, pollfd{err_pipe[0], POLLIN, 0}};
pollfd{err_pipe[0], POLLIN, 0}};
while (poll(plist.data(), plist.size(), /*timeout*/ -1) > 0) { while (poll(plist.data(), plist.size(), /*timeout*/ -1) > 0) {
if (plist[0].revents & POLLIN) { if (plist[0].revents & POLLIN) {
ssize_t bytes_read = ssize_t bytes_read = read(out_pipe[0], buffer.data(), buffer.size());
read(out_pipe[0], buffer.data(), buffer.size());
if (bytes_read == -1) { if (bytes_read == -1) {
throw std::system_error{errno, std::generic_category()}; throw std::system_error{errno, std::generic_category()};
} }
out.append(buffer.data(), bytes_read); out.append(buffer.data(), bytes_read);
} else if (plist[1].revents & POLLIN) { } else if (plist[1].revents & POLLIN) {
ssize_t bytes_read = ssize_t bytes_read = read(err_pipe[0], buffer.data(), buffer.size());
read(err_pipe[0], buffer.data(), buffer.size());
if (bytes_read == -1) { if (bytes_read == -1) {
throw std::system_error{errno, std::generic_category()}; throw std::system_error{errno, std::generic_category()};
} }

Loading…
Cancel
Save