Compare commits

..

5 Commits
master ... rpc

Author SHA1 Message Date
Bastien OLLIER e634bc9f48 commit
continuous-integration/drone/push Build is passing Details
2 years ago
Bastien OLLIER 061135e2b3 add loop
continuous-integration/drone/push Build is passing Details
2 years ago
Bastien OLLIER b027ac07ec add main loop
continuous-integration/drone/push Build is failing Details
2 years ago
Clément FRÉVILLE 373af88ceb Fix rpclib build
continuous-integration/drone/push Build is passing Details
2 years ago
Bastien OLLIER 858860b9ae test rpc
continuous-integration/drone/push Build is failing Details
2 years ago

@ -2,4 +2,3 @@
BasedOnStyle: LLVM
IndentWidth: 4
FixNamespaceComments: false
ColumnLimit: 100000

@ -1,11 +0,0 @@
.git
github
.vscode
.idea
build
cmake-build-debug
.clang-format
.drone.yml
.dockerignore
Dockerfile

@ -6,5 +6,5 @@ steps:
- name: build
image: hub.codefirst.iut.uca.fr/clement.freville2/planificador-build-image:latest
commands:
- cmake -B build -S .
- cmake --build build --parallel $(nproc)
- mkdir build && cd build && cmake ..
- make

4
.gitignore vendored

@ -1,7 +1,3 @@
bin
build
obj
.idea
.vscode
cmake-build-debug

@ -1,51 +1,32 @@
cmake_minimum_required(VERSION 3.14)
set(CMAKE_POLICY_DEFAULT_CMP0077 NEW)
cmake_minimum_required(VERSION 3.10)
project(planificador)
add_compile_options(-Wall -Wextra -pedantic)
include(FetchContent)
FetchContent_Declare(
tomlplusplus
GIT_REPOSITORY https://github.com/marzer/tomlplusplus.git
GIT_TAG v3.3.0
rpclib
GIT_REPOSITORY https://github.com/rpclib/rpclib.git
GIT_TAG master
)
FetchContent_MakeAvailable(rpclib)
FetchContent_Declare(
tomlplusplus
GIT_REPOSITORY https://github.com/marzer/tomlplusplus.git
GIT_TAG v3.3.0
)
FetchContent_MakeAvailable(tomlplusplus)
list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_LIST_DIR}/libzmq-pkg-config)
find_package(ZeroMQ QUIET)
add_executable(planificador src/runner.cpp src/network.cpp src/config.cpp src/main.cpp)
if (NOT ZeroMQ_FOUND)
message(STATUS "ZeroMQ not found, using bundled version")
FetchContent_Declare(
zmq
GIT_REPOSITORY https://github.com/zeromq/libzmq.git
GIT_TAG v4.3.5
)
FetchContent_Declare(
cppzmq
GIT_REPOSITORY https://github.com/zeromq/cppzmq.git
GIT_TAG v4.10.0
)
set(WITH_PERF_TOOL OFF)
set(ENABLE_CPACK OFF)
set(ZMQ_BUILD_TESTS OFF)
set(WITH_TLS OFF)
set(WITH_DOC OFF)
set(BUILD_STATIC OFF)
set(BUILD_SHARED ON)
set(ZMQ_BUILD_TESTS OFF)
set(CPPZMQ_BUILD_TESTS OFF)
FetchContent_MakeAvailable(zmq cppzmq)
include_directories(${zmq_SOURCE_DIR}/include)
include_directories(${cppzmq_SOURCE_DIR})
target_link_libraries(planificador PRIVATE libzmq)
else ()
target_link_libraries(planificador PRIVATE zmq)
endif ()
target_compile_options(planificador PUBLIC -Wall -Wextra -pedantic)
add_executable(planificador src/host.cpp src/runner.cpp src/config.cpp src/main.cpp)
target_compile_features(planificador PUBLIC cxx_std_17)
include_directories(${tomlplusplus_SOURCE_DIR}/include)
include_directories(${rpclib_SOURCE_DIR}/include)
SET(THREADS_PREFER_PTHREAD_FLAG ON)
find_package(Threads REQUIRED)
target_link_libraries(planificador PRIVATE Threads::Threads)
target_link_libraries(planificador PRIVATE rpc)

@ -1,16 +0,0 @@
FROM alpine:3.18 as builder
WORKDIR /app
RUN apk add --no-cache build-base git zeromq-dev cmake \
&& apk add --no-cache cppzmq --repository=https://dl-cdn.alpinelinux.org/alpine/edge/testing
COPY . .
RUN cmake -B build -S . && cmake --build build --parallel $(nproc)
FROM alpine:3.18 as runner
RUN apk add --no-cache libc++ zeromq
COPY --from=builder /app/build/planificador /usr/local/bin/planificador
ENTRYPOINT ["/usr/local/bin/planificador"]

@ -1,53 +0,0 @@
planificador
===========
A sandbox execution environment for untrusted code. It acts as a front-end in front of Docker+Bubblewrap.
Tasks are submitted using a ZeroMQ message queue, allowing quick scaling of the system.
Protocol
--------
*planificador* receives messages from a ZeroMQ queue in binary format in big-endian.
Executor bound
--------------
The first byte of the message is the message type. The following types are supported:
* `0x00`: `SUBMIT` - Submit a new task to the system.
* `0x02`: `CANCEL` - Cancel a task.
The following bytes are the payload of the message. The format of the payload depends on the message type.
### SUBMIT
- 32 bytes: Task ID
- 4 bytes: Image field length
- 4 bytes: Code length
- Image field length bytes: Image field
- Code length bytes: Code
### CANCEL
- 32 bytes: Task ID
Client bound
------------
The first byte of the message is the message type. The following types are supported:
* `0x01`: `APPEND_OUT` - Append text to the task's stdout.
* `0x02`: `APPEND_ERR` - Append text to the task's stderr.
* `0x03`: `EXITED` - The task has exited.
## APPEND_OUT / APPEND_ERR
- 32 bytes: Task ID
- 4 bytes: Text length
- Text length bytes: Text
## EXITED
- 32 bytes: Task ID
- 4 bytes: Exit code

@ -0,0 +1,12 @@
[serveurs]
[serveurs.serveur1]
ip = '100.000.000.000'
nbContainerMax = 10
[serveurs.serveur2]
ip = '200.000.000.000'
nbContainerMax = 20
[serveurs.serveur3]
ip = '300.000.000.000'
nbContainerMax = 30

@ -1,24 +0,0 @@
[queue]
pull = "tcp://localhost:5557"
push = "tcp://localhost:5558"
threads = 1
[runner]
timeout = 2
[runners.javascript]
type = "bubblewrap"
args = ["node", "-e", "{}"]
[runners.bun]
type = "bubblewrap"
args = ["bun", "run", "/dev/stdin"]
[runners.bash]
type = "bubblewrap"
args = ["bash", "-c", "{}"]
[runners.moshell]
type = "docker"
image = "ghcr.io/moshell-lang/moshell:master"
args = ["moshell", "-c", "{}"]

@ -1,28 +0,0 @@
# Adapted from https://github.com/zeromq/cppzmq/blob/master/CMakeLists.txt
set(PKG_CONFIG_USE_CMAKE_PREFIX_PATH ON)
find_package(PkgConfig)
pkg_check_modules(PC_LIBZMQ QUIET libzmq)
set(ZeroMQ_VERSION ${PC_LIBZMQ_VERSION})
find_path(ZeroMQ_INCLUDE_DIR zmq.h
PATHS ${ZeroMQ_DIR}/include
${PC_LIBZMQ_INCLUDE_DIRS})
find_library(ZeroMQ_LIBRARY
NAMES zmq
PATHS ${ZeroMQ_DIR}/lib
${PC_LIBZMQ_LIBDIR}
${PC_LIBZMQ_LIBRARY_DIRS})
if(ZeroMQ_LIBRARY)
set(ZeroMQ_FOUND ON)
endif()
set ( ZeroMQ_LIBRARIES ${ZeroMQ_LIBRARY} )
set ( ZeroMQ_INCLUDE_DIRS ${ZeroMQ_INCLUDE_DIR} )
include ( FindPackageHandleStandardArgs )
# handle the QUIETLY and REQUIRED arguments and set ZMQ_FOUND to TRUE
# if all listed variables are TRUE
find_package_handle_standard_args ( ZeroMQ DEFAULT_MSG ZeroMQ_LIBRARIES ZeroMQ_INCLUDE_DIRS )

@ -1,76 +1,31 @@
#include "config.hpp"
#include <iostream>
#include <toml++/toml.h>
std::vector<sk::host> sk::config::loadHostsFromToml(const fs::path &confFile) {
std::vector<sk::host> hosts;
static sk::config read(const toml::table &config) {
std::unordered_map<std::string, sk::runner_strategy_config> strategies;
auto *table = config["runners"].as_table();
if (table != nullptr) {
for (auto [name, backend_config] : *table) {
auto *backend_table = backend_config.as_table();
if (backend_table == nullptr) {
std::cerr << "Invalid runner config for " << name << ": expected table\n";
continue;
}
auto *backend_type = backend_table->get_as<std::string>("type");
if (backend_type == nullptr) {
std::cerr << "Invalid runner config for " << name << ": missing type field\n";
continue;
}
auto *args = backend_table->get_as<toml::array>("args");
if (args == nullptr) {
std::cerr << "Invalid runner config for " << name << ": missing args field\n";
continue;
}
std::vector<std::string> args_vector;
for (const auto &arg : *args) {
auto *arg_string = arg.as_string();
if (arg_string == nullptr) {
std::cerr << "Invalid runner config for " << name << ": args must be strings\n";
continue;
}
args_vector.emplace_back(*arg_string);
}
if (*backend_type == "docker") {
auto *image = backend_table->get_as<std::string>("image");
if (image == nullptr) {
std::cerr << "Invalid runner config for " << name << ": missing image field\n";
continue;
}
strategies.emplace(name, sk::docker_config{{std::move(args_vector)}, std::string{*image}});
} else if (*backend_type == "bubblewrap") {
strategies.emplace(name, sk::bubblewrap_config{std::move(args_vector)});
} else {
std::cerr << "Invalid runner config for " << name << ": unknown type " << *backend_type << "\n";
}
}
auto config = toml::parse_file(confFile.string());
toml::table *serverSection = config.get_as<toml::table>("serveurs");
if (serverSection == nullptr) {
return hosts;
}
return sk::config{
sk::queue_config{
config["queue"]["pull"].value_or("tcp://localhost:5557"),
config["queue"]["push"].value_or("tcp://localhost:5558"),
config["queue"]["threads"].value_or(1u),
},
sk::runner_config{
config["runner"]["timeout"].value_or(2u),
std::move(strategies),
},
};
}
sk::config sk::config::read_or_default(const std::filesystem::path &path, bool expect_present) {
std::ifstream t(path);
if (!t) {
if (errno == ENOENT && !expect_present) {
std::cout << "Using default config\n";
} else {
std::cerr << "Failed to open config file " << path << ": " << strerror(errno) << "\n";
// Parcourir les serveurs
for (const auto &[serverNameKey, serverInfoValue] : *serverSection) {
std::string serverName{serverNameKey};
toml::table *serverInfoPtr = serverInfoValue.as_table();
if (serverInfoPtr == nullptr) {
}
toml::table &serverInfo = *serverInfoPtr;
if (serverInfo.get_as<std::string>("ip") &&
serverInfo.get_as<int64_t>("nbContainerMax")) {
std::string serverIp = serverInfo.get_as<std::string>("ip")->get();
int serverMaxContainers =
serverInfo.get_as<int64_t>("nbContainerMax")->get();
hosts.push_back(sk::host(serverIp, serverMaxContainers));
}
return read(toml::table{});
}
std::stringstream buffer;
buffer << t.rdbuf();
auto config = toml::parse(buffer.str(), path);
return read(config);
return hosts;
}

@ -1,37 +1,15 @@
#pragma once
#include <filesystem>
#include <string>
#include <unordered_map>
#include <variant>
#include <toml++/toml.h>
#include <vector>
namespace sk {
struct queue_config {
std::string pull_addr;
std::string push_addr;
unsigned int nb_threads;
};
struct bubblewrap_config {
std::vector<std::string> args;
};
struct docker_config : bubblewrap_config {
std::string image;
};
#include "host.hpp"
namespace fs = std::filesystem;
using runner_strategy_config = std::variant<bubblewrap_config, docker_config>;
struct runner_config {
unsigned int timeout;
std::unordered_map<std::string, runner_strategy_config> strategies;
};
struct config {
queue_config queue;
runner_config runner;
static config read_or_default(const std::filesystem::path &path, bool expect_present = false);
namespace sk {
class config {
public:
static std::vector<sk::host> loadHostsFromToml(const fs::path &confFile);
};
}
}

@ -0,0 +1,17 @@
#include "host.hpp"
namespace sk {
host::host(const std::string &ip, unsigned int connectionsMax)
: ip{ip}, connections{0}, connectionsMax{connectionsMax}, runners{} {}
void host::addConnection(sk::runner &runner) {
runners.push(runner);
connections += 1;
}
const std::string &host::getIp() const { return ip; }
unsigned int host::getNbConnections() const { return connections; }
unsigned int host::getNbConnectionsMax() const { return connectionsMax; }
}

@ -0,0 +1,30 @@
#pragma once
#include "runner.hpp"
#include <queue>
#include <string>
namespace sk {
class host {
std::string ip;
unsigned int connections;
unsigned int connectionsMax;
std::queue<sk::runner> runners;
public:
host(const std::string &ip, unsigned int connectionsMax);
void addConnection(sk::runner &runner);
const std::string &getIp() const;
unsigned int getNbConnections() const;
unsigned int getNbConnectionsMax() const;
bool operator<(const host &other) const {
return (connectionsMax - connections) <
other.getNbConnectionsMax() - other.getNbConnections();
}
};
}

@ -1,34 +1,36 @@
#include "network.hpp"
#include "program.hpp"
#include "runner.hpp"
#include <csignal>
#include <filesystem>
#include <iostream>
#include <queue>
#include <spawn.h>
#include <sys/wait.h>
#include <string_view>
#include <toml++/toml.h>
#include <unistd.h>
#include <vector>
#include <wait.h>
#include <thread>
#include "host.hpp"
#include "rpc/server.h"
#include "rpc/this_handler.h"
#include "config.hpp"
#include "zmq_addon.hpp"
#include "program.hpp"
#include "runner.hpp"
#include <chrono>
static constexpr uint32_t MIN_SUBMIT_MESSAGE_LEN = sk::JOB_ID_LEN + sizeof(uint32_t) * 2;
static constexpr uint32_t MIN_CANCEL_MESSAGE_LEN = sk::JOB_ID_LEN + sizeof(uint32_t);
static constexpr int SUBMIT_EXECUTOR_BOUND = 0;
static constexpr int CANCEL_EXECUTOR_BOUND = 1;
static constexpr int STDOUT_CLIENT_BOUND = 1;
static constexpr int STDERR_CLIENT_BOUND = 2;
static constexpr int EXIT_CLIENT_BOUND = 3;
namespace fs = std::filesystem;
sk::runner_backend detect_backend() {
const char *const argv[] = {"docker", "stats", "--no-stream", nullptr};
pid_t pid;
if (posix_spawnp(&pid, argv[0], nullptr, nullptr, const_cast<char *const *>(argv), nullptr) != 0) {
if (posix_spawnp(&pid, argv[0], nullptr, nullptr,
const_cast<char *const *>(argv), nullptr) != 0) {
return sk::runner_backend::BubbleWrap;
}
int status = 0;
waitpid(pid, &status, 0);
std::cout << status << std::endl;
if (WIFEXITED(status) && WEXITSTATUS(status) == 0) {
std::cerr << "Using Docker" << std::endl;
return sk::runner_backend::Docker;
@ -36,142 +38,53 @@ sk::runner_backend detect_backend() {
return sk::runner_backend::BubbleWrap;
}
sk::runner_list *global_runners = nullptr;
int main(int argc, char **argv) {
int opt;
std::optional<sk::runner_backend> selected_backend;
std::filesystem::path config_path("config.toml");
bool may_use_default_config = true;
while ((opt = getopt(argc, argv, "bc:d")) != -1) {
switch (opt) {
case 'b':
selected_backend = sk::runner_backend::BubbleWrap;
break;
case 'c':
config_path = optarg;
may_use_default_config = false;
break;
case 'd':
selected_backend = sk::runner_backend::Docker;
break;
default:
std::cerr << "Usage: %s [-bc] [script]\n";
return 1;
}
void mainLoop(std::queue<sk::program>& programLists){
while(true){
auto now = std::chrono::system_clock::now();
std::time_t now_time = std::chrono::system_clock::to_time_t(now);
std::cout<<std::ctime(&now_time)<<" "<<programLists.size()<<std::endl;
if (programLists.size() == 1)
exit(0);
}
}
sk::config config = sk::config::read_or_default(config_path, !may_use_default_config);
sk::runner_list runners(selected_backend.has_value() ? selected_backend.value() : detect_backend(), config.runner);
global_runners = &runners;
struct sigaction action {};
action.sa_handler = [](int) {
if (global_runners) {
global_runners->exit_active_jobs();
}
};
sigaction(SIGINT, &action, nullptr);
sigaction(SIGTERM, &action, nullptr);
if (optind < argc) {
std::ifstream t(argv[optind]);
if (!t) {
std::cerr << "Unable to open file " << argv[optind] << std::endl;
return 1;
}
std::stringstream buffer;
buffer << t.rdbuf();
std::string code = buffer.str();
sk::program program{"sample-code", code, "moshell"};
sk::runner *runner = runners.find_runner_for(program);
if (runner == nullptr) {
std::cerr << "No runner found for " << program.image << std::endl;
return 1;
}
sk::run_result result = runner->run_blocking(program);
std::cout << "exited with: " << result.exit_code << "\n";
std::cout << "out: " << result.out << "\n";
std::cout << "err: " << result.err << "\n";
return 0;
int main() {
std::vector<sk::host> listsHosts = sk::config::loadHostsFromToml("../conf.toml");
std::priority_queue<sk::host> hosts(listsHosts.begin(), listsHosts.end());
std::queue<sk::program> programLists;
if (hosts.empty()) {
std::cerr << "Pas de host" << std::endl;
exit(1);
}
zmq::context_t context(static_cast<int>(config.queue.nb_threads));
zmq::socket_t receiver(context, zmq::socket_type::pull);
receiver.connect(config.queue.pull_addr);
zmq::socket_t sender(context, zmq::socket_type::push);
sender.connect(config.queue.push_addr);
auto send = [&sender](int type, const std::string &jobId, const std::string &text) {
#ifndef NDEBUG
std::cout << "Result: `" << text << "`\n";
#endif
auto [reply, reply_bytes] = sk::prepare_headers(sizeof(uint32_t) + text.size(), type, jobId);
sk::write_string(reply_bytes, text);
sender.send(reply, zmq::send_flags::none);
};
while (true) {
zmq::message_t request;
zmq::recv_result_t _ = receiver.recv(request);
const auto *message = static_cast<const char *>(request.data()) + 1;
auto *message_bytes = static_cast<const std::byte *>(request.data()) + 1;
int message_type = static_cast<int>(*static_cast<const unsigned char *>(request.data()));
switch (message_type) {
case SUBMIT_EXECUTOR_BOUND: {
if (request.size() < MIN_SUBMIT_MESSAGE_LEN) {
std::cerr << "Invalid request\n";
continue;
}
std::string jobId(message, sk::JOB_ID_LEN);
uint32_t imageLen = sk::read_uint32(message_bytes + sk::JOB_ID_LEN);
uint32_t codeLen = sk::read_uint32(message_bytes + sk::JOB_ID_LEN + sizeof(uint32_t));
if (request.size() < MIN_SUBMIT_MESSAGE_LEN + imageLen + codeLen) {
std::cerr << "Request is too short\n";
continue;
}
std::string imageString(message + MIN_SUBMIT_MESSAGE_LEN, imageLen);
std::string requestString(message + MIN_SUBMIT_MESSAGE_LEN + imageLen, codeLen);
#ifndef NDEBUG
std::cout << "Executing " << codeLen << " bytes code.\n";
#endif
sk::program program{std::move(jobId), std::move(requestString), std::move(imageString)};
sk::runner *runner = runners.find_runner_for(program);
if (runner == nullptr) {
send(STDERR_CLIENT_BOUND, program.name, "No runner found for " + program.image);
sk::prepare_headers(sizeof(uint32_t), EXIT_CLIENT_BOUND, program.name);
auto [reply, reply_bytes] = sk::prepare_headers(sizeof(uint32_t), EXIT_CLIENT_BOUND, program.name);
sk::write_uint32(reply_bytes, -1);
sender.send(reply, zmq::send_flags::none);
continue;
}
sk::run_result result = runner->run_blocking(program);
if (!result.out.empty()) {
send(STDOUT_CLIENT_BOUND, program.name, result.out);
}
if (!result.err.empty()) {
send(STDERR_CLIENT_BOUND, program.name, result.err);
}
auto [reply, reply_bytes] = sk::prepare_headers(sizeof(uint32_t), EXIT_CLIENT_BOUND, program.name);
sk::write_uint32(reply_bytes, result.exit_code);
sender.send(reply, zmq::send_flags::none);
break;
}
case CANCEL_EXECUTOR_BOUND: {
if (request.size() < MIN_CANCEL_MESSAGE_LEN) {
std::cerr << "Invalid request\n";
continue;
}
std::string jobId(message, sk::JOB_ID_LEN);
runners.kill_active(jobId);
break;
}
default:
std::cerr << "Invalid " << std::hex << message_type << " message type\n";
break;
std::thread workerThread(mainLoop, std::ref(programLists));
rpc::server srv(9000);
srv.bind("add", [&hosts, &programLists](const std::string &prog, const std::string &image) {
programLists.push(sk::program{prog, image});
const sk::program &currentProgram = programLists.front();
const sk::host &currentHost = hosts.top();
sk::runner runner(detect_backend());
sk::run_result res = runner.run_blocking(currentProgram);
std::cout << "out: " << res.out << "\n";
std::cout << "err: " << res.err << "\n";
programLists.pop();
if (res.err != "") {
rpc::this_handler().respond_error(res.err);
}
}
return res.out;
});
srv.run();
return 0;
}

@ -1,27 +0,0 @@
#include "network.hpp"
#include <cstring>
namespace sk {
uint32_t read_uint32(const std::byte *buffer) { return static_cast<uint32_t>(buffer[3]) | static_cast<uint32_t>(buffer[2]) << 8 | static_cast<uint32_t>(buffer[1]) << 16 | static_cast<uint32_t>(buffer[0]) << 24; }
void write_uint32(std::byte *buffer, uint32_t value) {
buffer[0] = static_cast<std::byte>(value >> 24);
buffer[1] = static_cast<std::byte>(value >> 16);
buffer[2] = static_cast<std::byte>(value >> 8);
buffer[3] = static_cast<std::byte>(value);
}
void write_string(std::byte *buffer, std::string_view text) {
auto size = static_cast<uint32_t>(text.size());
write_uint32(buffer, size);
memcpy(buffer, text.data(), size);
}
std::tuple<zmq::message_t, std::byte *> prepare_headers(size_t data_len, int type, std::string_view jobId) {
zmq::message_t reply(1 + JOB_ID_LEN + data_len);
auto *reply_bytes = static_cast<std::byte *>(reply.data());
*reply_bytes = static_cast<std::byte>(type);
memcpy(reply_bytes + 1, jobId.data(), JOB_ID_LEN);
return {std::move(reply), reply_bytes + 1 + JOB_ID_LEN};
}
}

@ -1,18 +0,0 @@
#pragma once
#include <cstddef>
#include <cstdint>
#include <string_view>
#include <tuple>
#include <zmq.hpp>
namespace sk {
static constexpr uint32_t JOB_ID_LEN = 32;
uint32_t read_uint32(const std::byte *buffer);
void write_uint32(std::byte *buffer, uint32_t value);
void write_string(std::byte *buffer, std::string_view text);
std::tuple<zmq::message_t, std::byte *> prepare_headers(size_t data_len, int type, std::string_view jobId);
}

@ -5,7 +5,6 @@
namespace sk {
struct program {
std::string name;
std::string code;
std::string image;
};

@ -1,156 +1,62 @@
#include "runner.hpp"
#include <algorithm>
#include <array>
#include <cerrno>
#include <fcntl.h>
#include <poll.h>
#include <spawn.h>
#include <sys/timerfd.h>
#include <sys/wait.h>
#include <system_error>
#include <unistd.h>
// Define a helper to throw a system error if a syscall fails
static auto ensure = [](int res) -> void {
if (res == -1) {
throw std::system_error{errno, std::generic_category()};
}
};
template <class> inline constexpr bool always_false_v = false;
#include <wait.h>
namespace sk {
execution_strategy::execution_strategy(const std::vector<std::string> &patterns) {
for (const auto &pattern : patterns) {
if (pattern == "{}") {
this->patterns.emplace_back(nullptr);
} else {
this->patterns.emplace_back(pattern);
}
}
}
execution_strategy::execution_strategy(std::vector<pattern> patterns) : patterns(std::move(patterns)) {}
std::unique_ptr<execution_strategy> execution_strategy::create(const runner_strategy_config &config) {
return std::visit(
[](auto &&strategy) -> std::unique_ptr<execution_strategy> {
using T = std::decay_t<decltype(strategy)>;
if constexpr (std::is_same_v<T, sk::bubblewrap_config>) {
return std::make_unique<bubblewrap_execution_strategy>(strategy.args);
} else if constexpr (std::is_same_v<T, sk::docker_config>) {
return std::make_unique<docker_execution_strategy>(strategy.args, strategy.image);
} else {
static_assert(always_false_v<T>, "non-exhaustive visitor!");
}
},
config);
}
void execution_strategy::concat_patterns(std::vector<const char *> &args, const program &program) const {
for (const auto &pattern : patterns) {
std::visit(
[&args, &program](const auto &arg) {
if constexpr (std::is_same_v<std::string, std::decay_t<decltype(arg)>>) {
args.push_back(arg.c_str());
} else {
args.push_back(program.code.c_str());
}
},
pattern);
}
}
bubblewrap_execution_strategy::bubblewrap_execution_strategy(const std::vector<std::string> &patterns) : execution_strategy{patterns} {}
bubblewrap_execution_strategy::bubblewrap_execution_strategy(std::vector<pattern> patterns) : execution_strategy{std::move(patterns)} {}
std::vector<const char *> bubblewrap_execution_strategy::start(const program &program) {
std::vector<const char *> args{"bwrap", "--ro-bind", "/usr", "/usr", "--dir", "/tmp", "--dir", "/var", "--proc", "/proc", "--dev", "/dev", "--symlink", "usr/lib", "/lib", "--symlink", "usr/lib64", "/lib64", "--symlink", "usr/bin", "/bin", "--symlink", "usr/sbin", "/sbin", "--unshare-all", "--die-with-parent"};
concat_patterns(args, program);
return args;
}
void bubblewrap_execution_strategy::stop(const active_job &job) { ensure(kill(job.pid, SIGINT)); }
docker_execution_strategy::docker_execution_strategy(const std::vector<std::string> &patterns, std::string image) : execution_strategy{patterns}, image{std::move(image)} {}
docker_execution_strategy::docker_execution_strategy(std::vector<pattern> patterns, std::string image) : execution_strategy{std::move(patterns)}, image{std::move(image)} {}
std::vector<const char *> docker_execution_strategy::start(const program &program) {
std::vector<const char *> args{"docker", "run", "--rm", "-i", "--name", program.name.c_str(), "--pull=never", "--cap-drop=ALL", "--network=none", "--memory=64m", "--memory-swap=64m", "--pids-limit=128", image.c_str()};
concat_patterns(args, program);
return args;
}
void docker_execution_strategy::stop(const active_job &job) {
const char *const kill_args[] = {"docker", "kill", job.job_id.c_str(), nullptr};
pid_t kill_pid;
ensure(posix_spawnp(&kill_pid, kill_args[0], nullptr, nullptr, const_cast<char *const *>(kill_args), nullptr));
}
runner::runner(const runner_strategy_config &config, unsigned int timeout) : backend{execution_strategy::create(config)}, timeout{static_cast<int>(timeout)} {}
runner::runner(const runner &other) : backend{&*other.backend}, timeout{other.timeout} {}
runner::runner(runner &&other) noexcept : backend{std::move(other.backend)}, timeout{other.timeout} {}
runner &runner::operator=(const runner &other) {
if (this == &other) {
return *this;
}
std::unique_ptr<execution_strategy> new_backend(&*other.backend);
backend = std::move(new_backend);
timeout = other.timeout;
return *this;
}
runner &runner::operator=(runner &&other) noexcept {
if (this == &other) {
return *this;
}
backend = std::move(other.backend);
timeout = other.timeout;
return *this;
}
runner::runner(runner_backend backend) : backend{backend} {}
run_result runner::run_blocking(const program &program) {
// Open file descriptors ahead of time
int in_pipe[2];
int out_pipe[2];
int err_pipe[2];
if (pipe2(in_pipe, O_CLOEXEC) == -1 || pipe2(out_pipe, O_CLOEXEC) == -1 || pipe2(err_pipe, O_CLOEXEC) == -1) {
throw std::system_error{errno, std::generic_category()};
}
// Create a timer that will be polled when the program runs too long
int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
if (timerfd == -1) {
throw std::system_error{errno, std::generic_category()};
}
itimerspec timer{};
timer.it_value.tv_sec = timeout;
if (timerfd_settime(timerfd, 0, &timer, nullptr) == -1) {
if (pipe(in_pipe) == -1 || pipe(out_pipe) == -1 || pipe(err_pipe) == -1) {
throw std::system_error{errno, std::generic_category()};
}
// Avoid blocking on stdin, and being interrupted if the pipe is closed prematurely
int flags = fcntl(in_pipe[1], F_GETFL, 0);
fcntl(in_pipe[1], F_SETFL, flags | O_NONBLOCK);
posix_spawn_file_actions_t actions;
posix_spawn_file_actions_init(&actions);
posix_spawn_file_actions_addclose(&actions, timerfd);
posix_spawn_file_actions_adddup2(&actions, in_pipe[0], STDIN_FILENO);
posix_spawn_file_actions_adddup2(&actions, out_pipe[1], STDOUT_FILENO);
posix_spawn_file_actions_adddup2(&actions, err_pipe[1], STDERR_FILENO);
std::vector<const char *> args = backend->start(program);
args.push_back(nullptr);
posix_spawn_file_actions_addclose(&actions, in_pipe[1]);
posix_spawn_file_actions_addclose(&actions, out_pipe[0]);
posix_spawn_file_actions_addclose(&actions, err_pipe[0]);
posix_spawn_file_actions_adddup2(&actions, in_pipe[0], 0);
posix_spawn_file_actions_adddup2(&actions, out_pipe[1], 1);
posix_spawn_file_actions_adddup2(&actions, err_pipe[1], 2);
posix_spawn_file_actions_addclose(&actions, in_pipe[0]);
posix_spawn_file_actions_addclose(&actions, out_pipe[1]);
posix_spawn_file_actions_addclose(&actions, err_pipe[1]);
const char *const docker_args[] = {"docker",
"run",
"--rm",
"-i",
"--pull=never",
"--cap-drop=ALL",
"--network=none",
"--memory=64m",
"--memory-swap=64m",
"--pids-limit=128",
program.image.c_str(),
nullptr};
const char *const bwrap_args[] = {
"bwrap", "--ro-bind", "/usr", "/usr", "--dir",
"/tmp", "--dir", "/var", "--proc", "/proc",
"--dev", "/dev", "--symlink", "usr/lib", "/lib",
"--symlink", "usr/lib64", "/lib64", "--symlink", "usr/bin",
"/bin", "--symlink", "usr/sbin", "/sbin", "--unshare-all",
"/bin/sh", nullptr};
const char *const *args = docker_args;
if (backend == runner_backend::BubbleWrap) {
args = bwrap_args;
}
pid_t pid;
bool killed = false;
int exit_code;
if (posix_spawnp(&pid, args[0], &actions, nullptr, const_cast<char *const *>(args.data()), nullptr) != 0) {
if (posix_spawnp(&pid, args[0], &actions, nullptr,
const_cast<char *const *>(args), nullptr) != 0) {
throw std::system_error{errno, std::generic_category()};
}
@ -158,145 +64,38 @@ run_result runner::run_blocking(const program &program) {
close(out_pipe[1]);
close(err_pipe[1]);
// Register the job as active
{
std::lock_guard<std::mutex> guard(active_jobs_mutex);
active_jobs.push_back(active_job{program.name, pid});
}
size_t len = program.code.size();
size_t window = 0;
const char *data = program.code.data();
write(in_pipe[1], program.code.data(), program.code.size());
close(in_pipe[1]);
std::array<char, 1024> buffer{};
std::string out;
std::string err;
std::array<pollfd, 4> plist = {pollfd{in_pipe[1], POLLOUT | POLLHUP, 0}, pollfd{out_pipe[0], POLLIN, 0}, pollfd{err_pipe[0], POLLIN, 0}, pollfd{timerfd, POLLIN, 0}};
pollfd *pfds = plist.data();
nfds_t nfds = plist.size();
int res;
while ((res = poll(pfds, nfds, -1)) > 0 || (res == -1 && errno == EINTR)) {
if (res == -1) {
// Interrupted by a signal, retry
continue;
}
nfds_t polled_fds = 0;
if (nfds == plist.size()) {
// Poll stdin when we still have data to write
if (pfds[0].revents & (POLLHUP | POLLERR)) {
// Closing input prematurely
close(in_pipe[1]);
--nfds;
++pfds;
++polled_fds;
} else if (pfds[0].revents & POLLOUT) {
if (window >= len) {
// End input
close(in_pipe[1]);
--nfds;
++pfds;
} else {
// Write the current data window
size_t l = std::min(len - window, buffer.size());
write(pfds[0].fd, data + window, l);
window += l;
}
++polled_fds;
std::array<pollfd, 2> plist = {pollfd{out_pipe[0], POLLIN, 0},
pollfd{err_pipe[0], POLLIN, 0}};
while (poll(plist.data(), plist.size(), /*timeout*/ -1) > 0) {
if (plist[0].revents & POLLIN) {
ssize_t bytes_read =
read(out_pipe[0], buffer.data(), buffer.size());
if (bytes_read == -1) {
throw std::system_error{errno, std::generic_category()};
}
}
// Poll stdout, stderr and the timer
for (nfds_t i = nfds - 3; i < nfds; ++i) {
if (pfds[i].revents & POLLIN) {
ssize_t bytes_read = read(pfds[i].fd, buffer.data(), buffer.size());
if (bytes_read == -1) {
throw std::system_error{errno, std::generic_category()};
}
if (pfds[i].fd == out_pipe[0]) {
out.append(buffer.data(), bytes_read);
} else if (pfds[i].fd == timerfd) {
std::lock_guard<std::mutex> guard(active_jobs_mutex);
auto it = std::find_if(active_jobs.begin(), active_jobs.end(), [pid](const active_job &job) { return job.pid == pid; });
if (it != active_jobs.end()) {
exit(*it);
active_jobs.erase(it);
}
killed = true;
} else {
err.append(buffer.data(), bytes_read);
}
++polled_fds;
out.append(buffer.data(), bytes_read);
} else if (plist[1].revents & POLLIN) {
ssize_t bytes_read =
read(err_pipe[0], buffer.data(), buffer.size());
if (bytes_read == -1) {
throw std::system_error{errno, std::generic_category()};
}
}
// If nothing was polled, we're done
if (polled_fds == 0) {
err.append(buffer.data(), bytes_read);
} else {
break;
}
}
waitpid(pid, &exit_code, 0);
close(out_pipe[0]);
close(err_pipe[0]);
close(timerfd);
// Remove the job from the active list
{
std::lock_guard<std::mutex> guard(active_jobs_mutex);
auto it = std::find_if(active_jobs.begin(), active_jobs.end(), [pid](const active_job &job) { return job.pid == pid; });
if (it != active_jobs.end()) {
active_jobs.erase(it);
}
}
posix_spawn_file_actions_destroy(&actions);
return run_result{out, err, killed ? 124 : exit_code};
}
bool runner::kill_active(const std::string &jobId) {
std::lock_guard<std::mutex> guard(active_jobs_mutex);
auto it = std::find_if(active_jobs.begin(), active_jobs.end(), [&jobId](const active_job &job) { return job.job_id == jobId; });
if (it != active_jobs.end()) {
exit(*it);
active_jobs.erase(it);
return true;
}
return false;
}
void runner::exit_active_jobs() {
std::lock_guard<std::mutex> guard(active_jobs_mutex);
for (const auto &job : active_jobs) {
exit(job);
}
active_jobs.clear();
}
void runner::exit(const active_job &job) { backend->stop(job); }
runner_list::runner_list(sk::runner_backend preferred_backend, const runner_config &config) {
for (const auto &strategy : config.strategies) {
runners.emplace(std::string{strategy.first}, runner{strategy.second, config.timeout});
}
}
runner *runner_list::find_runner_for(const program &program) {
auto it = runners.find(program.image);
if (it != runners.end()) {
return &it->second;
}
return nullptr;
}
bool runner_list::kill_active(const std::string &jobId) {
return std::any_of(runners.begin(), runners.end(), [&jobId](auto &runner) { return runner.second.kill_active(jobId); });
}
void runner_list::exit_active_jobs() {
for (auto &[name, runner] : runners) {
runner.exit_active_jobs();
}
return run_result{out, err};
}
}

@ -1,87 +1,21 @@
#pragma once
#include "config.hpp"
#include "program.hpp"
#include <memory>
#include <mutex>
#include <string>
#include <variant>
#include <vector>
namespace sk {
struct [[nodiscard]] run_result {
std::string out;
std::string err;
int exit_code;
};
struct [[nodiscard]] active_job {
std::string job_id;
pid_t pid;
};
enum class runner_backend { BubbleWrap, Docker };
using pattern = std::variant<std::string, void *>;
class execution_strategy {
std::vector<pattern> patterns;
public:
explicit execution_strategy(const std::vector<std::string> &patterns);
explicit execution_strategy(std::vector<pattern> patterns);
virtual std::vector<const char *> start(const program &program) = 0;
virtual void stop(const active_job &job) = 0;
virtual ~execution_strategy() = default;
static std::unique_ptr<execution_strategy> create(const runner_strategy_config &config);
protected:
void concat_patterns(std::vector<const char *> &args, const program &program) const;
};
class bubblewrap_execution_strategy : public execution_strategy {
public:
explicit bubblewrap_execution_strategy(const std::vector<std::string> &patterns);
explicit bubblewrap_execution_strategy(std::vector<pattern> patterns);
std::vector<const char *> start(const program &program) override;
void stop(const active_job &job) override;
};
class docker_execution_strategy : public execution_strategy {
private:
std::string image;
public:
docker_execution_strategy(const std::vector<std::string> &patterns, std::string image);
docker_execution_strategy(std::vector<pattern> patterns, std::string image);
std::vector<const char *> start(const program &program) override;
void stop(const active_job &job) override;
};
class runner {
std::vector<active_job> active_jobs;
std::mutex active_jobs_mutex;
std::unique_ptr<execution_strategy> backend;
int timeout;
runner_backend backend;
public:
explicit runner(const runner_strategy_config &config, unsigned int timeout = 2u);
runner(const runner &other);
runner(runner &&other) noexcept;
runner &operator=(const runner &other);
runner &operator=(runner &&other) noexcept;
explicit runner(runner_backend backend);
run_result run_blocking(const program &program);
bool kill_active(const std::string &jobId);
void exit_active_jobs();
void exit(const active_job &job);
};
class runner_list {
std::unordered_map<std::string, runner> runners;
public:
runner_list(sk::runner_backend preferred_backend, const runner_config &config);
runner *find_runner_for(const program &program);
bool kill_active(const std::string &jobId);
void exit_active_jobs();
};
}

Loading…
Cancel
Save