Allow multiple backends #5

Merged
bastien.ollier merged 2 commits from feature/multiple-backends into master 1 year ago

@ -5,3 +5,20 @@ threads = 1
[runner]
timeout = 2
[runners.javascript]
type = "bubblewrap"
args = ["node", "-e", "{}"]
[runners.bun]
type = "bubblewrap"
args = ["bun", "run", "/dev/stdin"]
[runners.bash]
type = "bubblewrap"
args = ["bash", "-c", "{}"]
[runners.moshell]
type = "docker"
image = "ghcr.io/moshell-lang/moshell:master"
args = ["moshell", "-c", "{}"]

@ -4,6 +4,48 @@
#include <toml++/toml.h>
static sk::config read(const toml::table &config) {
std::unordered_map<std::string, sk::runner_strategy_config> strategies;
auto *table = config["runners"].as_table();
if (table != nullptr) {
for (auto [name, backend_config] : *table) {
auto *backend_table = backend_config.as_table();
if (backend_table == nullptr) {
std::cerr << "Invalid runner config for " << name << ": expected table\n";
continue;
}
auto *backend_type = backend_table->get_as<std::string>("type");
if (backend_type == nullptr) {
std::cerr << "Invalid runner config for " << name << ": missing type field\n";
continue;
}
auto *args = backend_table->get_as<toml::array>("args");
if (args == nullptr) {
std::cerr << "Invalid runner config for " << name << ": missing args field\n";
continue;
}
std::vector<std::string> args_vector;
for (const auto &arg : *args) {
auto *arg_string = arg.as_string();
if (arg_string == nullptr) {
std::cerr << "Invalid runner config for " << name << ": args must be strings\n";
continue;
}
args_vector.emplace_back(*arg_string);
}
if (*backend_type == "docker") {
auto *image = backend_table->get_as<std::string>("image");
if (image == nullptr) {
std::cerr << "Invalid runner config for " << name << ": missing image field\n";
continue;
}
strategies.emplace(name, sk::docker_config{{std::move(args_vector)}, std::string{*image}});
} else if (*backend_type == "bubblewrap") {
strategies.emplace(name, sk::bubblewrap_config{std::move(args_vector)});
} else {
std::cerr << "Invalid runner config for " << name << ": unknown type " << *backend_type << "\n";
}
}
}
return sk::config{
sk::queue_config{
config["queue"]["pull"].value_or("tcp://localhost:5557"),
@ -12,6 +54,7 @@ static sk::config read(const toml::table &config) {
},
sk::runner_config{
config["runner"]["timeout"].value_or(2u),
std::move(strategies),
},
};
}

@ -2,6 +2,9 @@
#include <filesystem>
#include <string>
#include <unordered_map>
#include <variant>
#include <vector>
namespace sk {
struct queue_config {
@ -10,8 +13,19 @@ struct queue_config {
unsigned int nb_threads;
};
struct bubblewrap_config {
std::vector<std::string> args;
};
struct docker_config : bubblewrap_config {
std::string image;
};
using runner_strategy_config = std::variant<bubblewrap_config, docker_config>;
struct runner_config {
unsigned int timeout;
std::unordered_map<std::string, runner_strategy_config> strategies;
};
struct config {

@ -36,7 +36,7 @@ sk::runner_backend detect_backend() {
return sk::runner_backend::BubbleWrap;
}
sk::runner *global_runner = nullptr;
sk::runner_list *global_runners = nullptr;
int main(int argc, char **argv) {
int opt;
@ -62,12 +62,12 @@ int main(int argc, char **argv) {
}
sk::config config = sk::config::read_or_default(config_path, !may_use_default_config);
sk::runner runner(selected_backend.has_value() ? selected_backend.value() : detect_backend(), config.runner);
global_runner = &runner;
sk::runner_list runners(selected_backend.has_value() ? selected_backend.value() : detect_backend(), config.runner);
global_runners = &runners;
struct sigaction action {};
action.sa_handler = [](int) {
if (global_runner) {
global_runner->exit_active_jobs();
if (global_runners) {
global_runners->exit_active_jobs();
}
};
sigaction(SIGINT, &action, nullptr);
@ -82,8 +82,13 @@ int main(int argc, char **argv) {
std::stringstream buffer;
buffer << t.rdbuf();
std::string code = buffer.str();
sk::program program{"sample-code", code, "ghcr.io/moshell-lang/moshell:master"};
sk::run_result result = runner.run_blocking(program);
sk::program program{"sample-code", code, "moshell"};
sk::runner *runner = runners.find_runner_for(program);
if (runner == nullptr) {
std::cerr << "No runner found for " << program.image << std::endl;
return 1;
}
sk::run_result result = runner->run_blocking(program);
std::cout << "exited with: " << result.exit_code << "\n";
std::cout << "out: " << result.out << "\n";
std::cout << "err: " << result.err << "\n";
@ -132,7 +137,16 @@ int main(int argc, char **argv) {
std::cout << "Executing " << codeLen << " bytes code.\n";
#endif
sk::program program{std::move(jobId), std::move(requestString), std::move(imageString)};
sk::run_result result = runner.run_blocking(program);
sk::runner *runner = runners.find_runner_for(program);
if (runner == nullptr) {
send(STDERR_CLIENT_BOUND, program.name, "No runner found for " + program.image);
sk::prepare_headers(sizeof(uint32_t), EXIT_CLIENT_BOUND, program.name);
auto [reply, reply_bytes] = sk::prepare_headers(sizeof(uint32_t), EXIT_CLIENT_BOUND, program.name);
sk::write_uint32(reply_bytes, -1);
sender.send(reply, zmq::send_flags::none);
continue;
}
sk::run_result result = runner->run_blocking(program);
if (!result.out.empty()) {
send(STDOUT_CLIENT_BOUND, program.name, result.out);
@ -151,7 +165,7 @@ int main(int argc, char **argv) {
continue;
}
std::string jobId(message, sk::JOB_ID_LEN);
runner.kill_active(jobId);
runners.kill_active(jobId);
break;
}
default:

@ -18,8 +18,102 @@ static auto ensure = [](int res) -> void {
}
};
template <class> inline constexpr bool always_false_v = false;
namespace sk {
runner::runner(runner_backend backend, const runner_config &config) : backend{backend}, timeout{static_cast<int>(config.timeout)} {}
execution_strategy::execution_strategy(const std::vector<std::string> &patterns) {
for (const auto &pattern : patterns) {
if (pattern == "{}") {
this->patterns.emplace_back(nullptr);
} else {
this->patterns.emplace_back(pattern);
}
}
}
execution_strategy::execution_strategy(std::vector<pattern> patterns) : patterns(std::move(patterns)) {}
std::unique_ptr<execution_strategy> execution_strategy::create(const runner_strategy_config &config) {
return std::visit(
[](auto &&strategy) -> std::unique_ptr<execution_strategy> {
using T = std::decay_t<decltype(strategy)>;
if constexpr (std::is_same_v<T, sk::bubblewrap_config>) {
return std::make_unique<bubblewrap_execution_strategy>(strategy.args);
} else if constexpr (std::is_same_v<T, sk::docker_config>) {
return std::make_unique<docker_execution_strategy>(strategy.args, strategy.image);
} else {
static_assert(always_false_v<T>, "non-exhaustive visitor!");
}
},
config);
}
void execution_strategy::concat_patterns(std::vector<const char *> &args, const program &program) const {
for (const auto &pattern : patterns) {
std::visit(
[&args, &program](const auto &arg) {
if constexpr (std::is_same_v<std::string, std::decay_t<decltype(arg)>>) {
args.push_back(arg.c_str());
} else {
args.push_back(program.code.c_str());
}
},
pattern);
}
}
bubblewrap_execution_strategy::bubblewrap_execution_strategy(const std::vector<std::string> &patterns) : execution_strategy{patterns} {}
bubblewrap_execution_strategy::bubblewrap_execution_strategy(std::vector<pattern> patterns) : execution_strategy{std::move(patterns)} {}
std::vector<const char *> bubblewrap_execution_strategy::start(const program &program) {
std::vector<const char *> args{"bwrap", "--ro-bind", "/usr", "/usr", "--dir", "/tmp", "--dir", "/var", "--proc", "/proc", "--dev", "/dev", "--symlink", "usr/lib", "/lib", "--symlink", "usr/lib64", "/lib64", "--symlink", "usr/bin", "/bin", "--symlink", "usr/sbin", "/sbin", "--unshare-all", "--die-with-parent"};
concat_patterns(args, program);
return args;
}
void bubblewrap_execution_strategy::stop(const active_job &job) { ensure(kill(job.pid, SIGINT)); }
docker_execution_strategy::docker_execution_strategy(const std::vector<std::string> &patterns, std::string image) : execution_strategy{patterns}, image{std::move(image)} {}
docker_execution_strategy::docker_execution_strategy(std::vector<pattern> patterns, std::string image) : execution_strategy{std::move(patterns)}, image{std::move(image)} {}
std::vector<const char *> docker_execution_strategy::start(const program &program) {
std::vector<const char *> args{"docker", "run", "--rm", "-i", "--name", program.name.c_str(), "--pull=never", "--cap-drop=ALL", "--network=none", "--memory=64m", "--memory-swap=64m", "--pids-limit=128", image.c_str()};
concat_patterns(args, program);
return args;
}
void docker_execution_strategy::stop(const active_job &job) {
const char *const kill_args[] = {"docker", "kill", job.job_id.c_str(), nullptr};
pid_t kill_pid;
ensure(posix_spawnp(&kill_pid, kill_args[0], nullptr, nullptr, const_cast<char *const *>(kill_args), nullptr));
}
runner::runner(const runner_strategy_config &config, unsigned int timeout) : backend{execution_strategy::create(config)}, timeout{static_cast<int>(timeout)} {}
runner::runner(const runner &other) : backend{&*other.backend}, timeout{other.timeout} {}
runner::runner(runner &&other) noexcept : backend{std::move(other.backend)}, timeout{other.timeout} {}
runner &runner::operator=(const runner &other) {
if (this == &other) {
return *this;
}
std::unique_ptr<execution_strategy> new_backend(&*other.backend);
backend = std::move(new_backend);
timeout = other.timeout;
return *this;
}
runner &runner::operator=(runner &&other) noexcept {
if (this == &other) {
return *this;
}
backend = std::move(other.backend);
timeout = other.timeout;
return *this;
}
run_result runner::run_blocking(const program &program) {
// Open file descriptors ahead of time
@ -51,16 +145,12 @@ run_result runner::run_blocking(const program &program) {
posix_spawn_file_actions_adddup2(&actions, in_pipe[0], STDIN_FILENO);
posix_spawn_file_actions_adddup2(&actions, out_pipe[1], STDOUT_FILENO);
posix_spawn_file_actions_adddup2(&actions, err_pipe[1], STDERR_FILENO);
const char *const docker_args[] = {"docker", "run", "--rm", "-i", "--name", program.name.c_str(), "--pull=never", "--cap-drop=ALL", "--network=none", "--memory=64m", "--memory-swap=64m", "--pids-limit=128", program.image.c_str(), nullptr};
const char *const bwrap_args[] = {"bwrap", "--ro-bind", "/usr", "/usr", "--dir", "/tmp", "--dir", "/var", "--proc", "/proc", "--dev", "/dev", "--symlink", "usr/lib", "/lib", "--symlink", "usr/lib64", "/lib64", "--symlink", "usr/bin", "/bin", "--symlink", "usr/sbin", "/sbin", "--unshare-all", "--die-with-parent", "/bin/sh", nullptr};
const char *const *args = docker_args;
if (backend == runner_backend::BubbleWrap) {
args = bwrap_args;
}
std::vector<const char *> args = backend->start(program);
args.push_back(nullptr);
pid_t pid;
bool killed = false;
int exit_code;
if (posix_spawnp(&pid, args[0], &actions, nullptr, const_cast<char *const *>(args), nullptr) != 0) {
if (posix_spawnp(&pid, args[0], &actions, nullptr, const_cast<char *const *>(args.data()), nullptr) != 0) {
throw std::system_error{errno, std::generic_category()};
}
@ -184,13 +274,29 @@ void runner::exit_active_jobs() {
active_jobs.clear();
}
void runner::exit(const active_job &job) {
if (backend == runner_backend::Docker) {
const char *const kill_args[] = {"docker", "kill", job.job_id.c_str(), nullptr};
pid_t kill_pid;
ensure(posix_spawnp(&kill_pid, kill_args[0], nullptr, nullptr, const_cast<char *const *>(kill_args), nullptr));
} else {
ensure(kill(job.pid, SIGINT));
void runner::exit(const active_job &job) { backend->stop(job); }
runner_list::runner_list(sk::runner_backend preferred_backend, const runner_config &config) {
for (const auto &strategy : config.strategies) {
runners.emplace(std::string{strategy.first}, runner{strategy.second, config.timeout});
}
}
runner *runner_list::find_runner_for(const program &program) {
auto it = runners.find(program.image);
if (it != runners.end()) {
return &it->second;
}
return nullptr;
}
bool runner_list::kill_active(const std::string &jobId) {
return std::any_of(runners.begin(), runners.end(), [&jobId](auto &runner) { return runner.second.kill_active(jobId); });
}
void runner_list::exit_active_jobs() {
for (auto &[name, runner] : runners) {
runner.exit_active_jobs();
}
}
}

@ -2,8 +2,10 @@
#include "config.hpp"
#include "program.hpp"
#include <memory>
#include <mutex>
#include <string>
#include <variant>
#include <vector>
namespace sk {
@ -19,20 +21,67 @@ struct [[nodiscard]] active_job {
};
enum class runner_backend { BubbleWrap, Docker };
using pattern = std::variant<std::string, void *>;
class execution_strategy {
std::vector<pattern> patterns;
public:
explicit execution_strategy(const std::vector<std::string> &patterns);
explicit execution_strategy(std::vector<pattern> patterns);
virtual std::vector<const char *> start(const program &program) = 0;
virtual void stop(const active_job &job) = 0;
virtual ~execution_strategy() = default;
static std::unique_ptr<execution_strategy> create(const runner_strategy_config &config);
protected:
void concat_patterns(std::vector<const char *> &args, const program &program) const;
};
class bubblewrap_execution_strategy : public execution_strategy {
public:
explicit bubblewrap_execution_strategy(const std::vector<std::string> &patterns);
explicit bubblewrap_execution_strategy(std::vector<pattern> patterns);
std::vector<const char *> start(const program &program) override;
void stop(const active_job &job) override;
};
class docker_execution_strategy : public execution_strategy {
private:
std::string image;
public:
docker_execution_strategy(const std::vector<std::string> &patterns, std::string image);
docker_execution_strategy(std::vector<pattern> patterns, std::string image);
std::vector<const char *> start(const program &program) override;
void stop(const active_job &job) override;
};
class runner {
runner_backend backend;
int timeout;
std::vector<active_job> active_jobs;
std::mutex active_jobs_mutex;
std::unique_ptr<execution_strategy> backend;
int timeout;
public:
runner(runner_backend backend, const runner_config &config);
explicit runner(const runner_strategy_config &config, unsigned int timeout = 2u);
runner(const runner &other);
runner(runner &&other) noexcept;
runner &operator=(const runner &other);
runner &operator=(runner &&other) noexcept;
run_result run_blocking(const program &program);
bool kill_active(const std::string &jobId);
void exit_active_jobs();
private:
void exit(const active_job &job);
};
class runner_list {
std::unordered_map<std::string, runner> runners;
public:
runner_list(sk::runner_backend preferred_backend, const runner_config &config);
runner *find_runner_for(const program &program);
bool kill_active(const std::string &jobId);
void exit_active_jobs();
};
}

Loading…
Cancel
Save