Merge pull request 'Limit the execution time' (#3) from sys/timeout into master
continuous-integration/drone/push Build is passing Details

Reviewed-on: #3
pull/4/head
Bastien OLLIER 2 years ago
commit 913b6ee15b

@ -4,7 +4,7 @@ namespace sk {
host::host(const std::string &ip, unsigned int connectionsMax) : ip{ip}, connections{0}, connectionsMax{connectionsMax}, runners{} {}
void host::addConnection(sk::runner &runner) {
runners.push(runner);
runners.push(&runner);
connections += 1;
}

@ -11,7 +11,7 @@ class host {
unsigned int connections;
unsigned int connectionsMax;
std::queue<sk::runner> runners;
std::queue<sk::runner *> runners;
public:
host(const std::string &ip, unsigned int connectionsMax);

@ -1,10 +1,10 @@
#include "network.hpp"
#include "program.hpp"
#include "runner.hpp"
#include <csignal>
#include <filesystem>
#include <iostream>
#include <spawn.h>
#include <string_view>
#include <toml++/toml.h>
#include <unistd.h>
#include <wait.h>
@ -22,7 +22,6 @@ sk::runner_backend detect_backend() {
}
int status = 0;
waitpid(pid, &status, 0);
std::cout << status << std::endl;
if (WIFEXITED(status) && WEXITSTATUS(status) == 0) {
std::cerr << "Using Docker" << std::endl;
return sk::runner_backend::Docker;
@ -30,6 +29,8 @@ sk::runner_backend detect_backend() {
return sk::runner_backend::BubbleWrap;
}
sk::runner *global_runner = nullptr;
int main(int argc, char **argv) {
int opt;
std::optional<sk::runner_backend> selected_backend;
@ -48,6 +49,15 @@ int main(int argc, char **argv) {
}
sk::runner runner(selected_backend.has_value() ? selected_backend.value() : detect_backend());
global_runner = &runner;
struct sigaction action {};
action.sa_handler = [](int) {
if (global_runner) {
global_runner->exit_active_jobs();
}
};
sigaction(SIGINT, &action, nullptr);
sigaction(SIGTERM, &action, nullptr);
if (optind < argc) {
std::ifstream t(argv[optind]);
@ -58,8 +68,9 @@ int main(int argc, char **argv) {
std::stringstream buffer;
buffer << t.rdbuf();
std::string code = buffer.str();
sk::program program{code, "ghcr.io/moshell-lang/moshell"};
sk::program program{"sample-code", code, "ghcr.io/moshell-lang/moshell:master"};
sk::run_result result = runner.run_blocking(program);
std::cout << "exited with: " << result.exit_code << "\n";
std::cout << "out: " << result.out << "\n";
std::cout << "err: " << result.err << "\n";
return 0;
@ -79,7 +90,7 @@ int main(int argc, char **argv) {
break;
}
std::string_view jobId(static_cast<char *>(request.data()), JOB_ID_LEN);
std::string jobId(static_cast<char *>(request.data()), JOB_ID_LEN);
uint32_t imageLen = sk::read_uint32(static_cast<char *>(request.data()) + JOB_ID_LEN);
uint32_t codeLen = sk::read_uint32(static_cast<char *>(request.data()) + JOB_ID_LEN + sizeof(uint32_t));
@ -91,15 +102,16 @@ int main(int argc, char **argv) {
std::string requestString(static_cast<char *>(request.data()) + MIN_MESSAGE_LEN + imageLen, codeLen);
std::cout << "Executing " << codeLen << " bytes code.\n";
sk::program program{requestString, imageString};
sk::program program{jobId, requestString, imageString};
sk::run_result result = runner.run_blocking(program);
std::cout << "Result: " << result.out << std::endl;
// Send the job id and result.out to sink
zmq::message_t reply(JOB_ID_LEN + result.out.size());
// Send the job id, the exit code and result.out to sink
zmq::message_t reply(JOB_ID_LEN + sizeof(uint32_t) + result.out.size());
memcpy(reply.data(), jobId.data(), JOB_ID_LEN);
memcpy(static_cast<char *>(reply.data()) + JOB_ID_LEN, result.out.data(), result.out.size());
sk::write_uint32(static_cast<char *>(reply.data()) + JOB_ID_LEN, result.exit_code);
memcpy(static_cast<char *>(reply.data()) + JOB_ID_LEN + sizeof(uint32_t), result.out.data(), result.out.size());
sender.send(reply, zmq::send_flags::none);
}
return 0;

@ -2,4 +2,10 @@
namespace sk {
uint32_t read_uint32(const char *buffer) { return static_cast<uint32_t>(buffer[3]) | static_cast<uint32_t>(buffer[2]) << 8 | static_cast<uint32_t>(buffer[1]) << 16 | static_cast<uint32_t>(buffer[0]) << 24; }
void write_uint32(char *buffer, uint32_t value) {
buffer[0] = static_cast<char>(value >> 24);
buffer[1] = static_cast<char>(value >> 16);
buffer[2] = static_cast<char>(value >> 8);
buffer[3] = static_cast<char>(value);
}
}

@ -4,4 +4,5 @@
namespace sk {
uint32_t read_uint32(const char *buffer);
void write_uint32(char *buffer, uint32_t value);
}

@ -5,6 +5,7 @@
namespace sk {
struct program {
std::string name;
std::string code;
std::string image;
};

@ -1,18 +1,30 @@
#include "runner.hpp"
#include <algorithm>
#include <array>
#include <cerrno>
#include <fcntl.h>
#include <poll.h>
#include <spawn.h>
#include <sys/timerfd.h>
#include <system_error>
#include <unistd.h>
#include <wait.h>
static constexpr int TIMEOUT_SECONDS = 2;
// Define a helper to throw a system error if a syscall fails
static auto ensure = [](int res) -> void {
if (res == -1) {
throw std::system_error{errno, std::generic_category()};
}
};
namespace sk {
runner::runner(runner_backend backend) : backend{backend} {}
run_result runner::run_blocking(const program &program) {
// Open file descriptors ahead of time
int in_pipe[2];
int out_pipe[2];
int err_pipe[2];
@ -20,6 +32,17 @@ run_result runner::run_blocking(const program &program) {
throw std::system_error{errno, std::generic_category()};
}
// Create a timer that will be polled when the program runs too long
int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
if (timerfd == -1) {
throw std::system_error{errno, std::generic_category()};
}
itimerspec timer{};
timer.it_value.tv_sec = TIMEOUT_SECONDS;
if (timerfd_settime(timerfd, 0, &timer, nullptr) == -1) {
throw std::system_error{errno, std::generic_category()};
}
// Avoid blocking on stdin, and being interrupted if the pipe is closed prematurely
int flags = fcntl(in_pipe[1], F_GETFL, 0);
fcntl(in_pipe[1], F_SETFL, flags | O_NONBLOCK);
@ -29,19 +52,21 @@ run_result runner::run_blocking(const program &program) {
posix_spawn_file_actions_addclose(&actions, in_pipe[1]);
posix_spawn_file_actions_addclose(&actions, out_pipe[0]);
posix_spawn_file_actions_addclose(&actions, err_pipe[0]);
posix_spawn_file_actions_addclose(&actions, timerfd);
posix_spawn_file_actions_adddup2(&actions, in_pipe[0], STDIN_FILENO);
posix_spawn_file_actions_adddup2(&actions, out_pipe[1], STDOUT_FILENO);
posix_spawn_file_actions_adddup2(&actions, err_pipe[1], STDERR_FILENO);
posix_spawn_file_actions_addclose(&actions, in_pipe[0]);
posix_spawn_file_actions_addclose(&actions, out_pipe[1]);
posix_spawn_file_actions_addclose(&actions, err_pipe[1]);
const char *const docker_args[] = {"docker", "run", "--rm", "-i", "--pull=never", "--cap-drop=ALL", "--network=none", "--memory=64m", "--memory-swap=64m", "--pids-limit=128", program.image.c_str(), nullptr};
const char *const bwrap_args[] = {"bwrap", "--ro-bind", "/usr", "/usr", "--dir", "/tmp", "--dir", "/var", "--proc", "/proc", "--dev", "/dev", "--symlink", "usr/lib", "/lib", "--symlink", "usr/lib64", "/lib64", "--symlink", "usr/bin", "/bin", "--symlink", "usr/sbin", "/sbin", "--unshare-all", "/bin/sh", nullptr};
const char *const docker_args[] = {"docker", "run", "--rm", "-i", "--name", program.name.c_str(), "--pull=never", "--cap-drop=ALL", "--network=none", "--memory=64m", "--memory-swap=64m", "--pids-limit=128", program.image.c_str(), nullptr};
const char *const bwrap_args[] = {"bwrap", "--ro-bind", "/usr", "/usr", "--dir", "/tmp", "--dir", "/var", "--proc", "/proc", "--dev", "/dev", "--symlink", "usr/lib", "/lib", "--symlink", "usr/lib64", "/lib64", "--symlink", "usr/bin", "/bin", "--symlink", "usr/sbin", "/sbin", "--unshare-all", "--die-with-parent", "/bin/sh", nullptr};
const char *const *args = docker_args;
if (backend == runner_backend::BubbleWrap) {
args = bwrap_args;
}
pid_t pid;
bool killed = false;
int exit_code;
if (posix_spawnp(&pid, args[0], &actions, nullptr, const_cast<char *const *>(args), nullptr) != 0) {
throw std::system_error{errno, std::generic_category()};
@ -51,6 +76,12 @@ run_result runner::run_blocking(const program &program) {
close(out_pipe[1]);
close(err_pipe[1]);
// Register the job as active
{
std::lock_guard<std::mutex> guard(active_jobs_mutex);
active_jobs.push_back(active_job{program.name, pid});
}
size_t len = program.code.size();
size_t window = 0;
const char *data = program.code.data();
@ -58,7 +89,7 @@ run_result runner::run_blocking(const program &program) {
std::array<char, 1024> buffer{};
std::string out;
std::string err;
std::array<pollfd, 3> plist = {pollfd{in_pipe[1], POLLOUT | POLLHUP, 0}, pollfd{out_pipe[0], POLLIN, 0}, pollfd{err_pipe[0], POLLIN, 0}};
std::array<pollfd, 4> plist = {pollfd{in_pipe[1], POLLOUT | POLLHUP, 0}, pollfd{out_pipe[0], POLLIN, 0}, pollfd{err_pipe[0], POLLIN, 0}, pollfd{timerfd, POLLIN, 0}};
pollfd *pfds = plist.data();
nfds_t nfds = plist.size();
@ -94,8 +125,8 @@ run_result runner::run_blocking(const program &program) {
}
}
// Poll stdout and stderr
for (nfds_t i = nfds - 2; i < nfds; ++i) {
// Poll stdout, stderr and the timer
for (nfds_t i = nfds - 3; i < nfds; ++i) {
if (pfds[i].revents & POLLIN) {
ssize_t bytes_read = read(pfds[i].fd, buffer.data(), buffer.size());
if (bytes_read == -1) {
@ -103,6 +134,14 @@ run_result runner::run_blocking(const program &program) {
}
if (pfds[i].fd == out_pipe[0]) {
out.append(buffer.data(), bytes_read);
} else if (pfds[i].fd == timerfd) {
std::lock_guard<std::mutex> guard(active_jobs_mutex);
auto it = std::find_if(active_jobs.begin(), active_jobs.end(), [pid](const active_job &job) { return job.pid == pid; });
if (it != active_jobs.end()) {
exit(*it);
active_jobs.erase(it);
}
killed = true;
} else {
err.append(buffer.data(), bytes_read);
}
@ -119,8 +158,36 @@ run_result runner::run_blocking(const program &program) {
waitpid(pid, &exit_code, 0);
close(out_pipe[0]);
close(err_pipe[0]);
close(timerfd);
// Remove the job from the active list
{
std::lock_guard<std::mutex> guard(active_jobs_mutex);
auto it = std::find_if(active_jobs.begin(), active_jobs.end(), [pid](const active_job &job) { return job.pid == pid; });
if (it != active_jobs.end()) {
active_jobs.erase(it);
}
}
posix_spawn_file_actions_destroy(&actions);
return run_result{out, err};
return run_result{out, err, killed ? 124 : exit_code};
}
void runner::exit_active_jobs() {
std::lock_guard<std::mutex> guard(active_jobs_mutex);
for (const auto &job : active_jobs) {
exit(job);
}
active_jobs.clear();
}
void runner::exit(const active_job &job) {
if (backend == runner_backend::Docker) {
const char *const kill_args[] = {"docker", "kill", job.job_id.c_str(), nullptr};
pid_t kill_pid;
ensure(posix_spawnp(&kill_pid, kill_args[0], nullptr, nullptr, const_cast<char *const *>(kill_args), nullptr));
} else {
ensure(kill(job.pid, SIGINT));
}
}
}

@ -1,21 +1,35 @@
#pragma once
#include "program.hpp"
#include <mutex>
#include <string>
#include <vector>
namespace sk {
struct [[nodiscard]] run_result {
std::string out;
std::string err;
int exit_code;
};
struct [[nodiscard]] active_job {
std::string job_id;
pid_t pid;
};
enum class runner_backend { BubbleWrap, Docker };
class runner {
runner_backend backend;
std::vector<active_job> active_jobs;
std::mutex active_jobs_mutex;
public:
explicit runner(runner_backend backend);
run_result run_blocking(const program &program);
void exit_active_jobs();
private:
void exit(const active_job &job);
};
}

Loading…
Cancel
Save