diff --git a/src/host.cpp b/src/host.cpp index 6b32fb8..a666728 100644 --- a/src/host.cpp +++ b/src/host.cpp @@ -4,7 +4,7 @@ namespace sk { host::host(const std::string &ip, unsigned int connectionsMax) : ip{ip}, connections{0}, connectionsMax{connectionsMax}, runners{} {} void host::addConnection(sk::runner &runner) { - runners.push(runner); + runners.push(&runner); connections += 1; } diff --git a/src/host.hpp b/src/host.hpp index a423692..3031b38 100644 --- a/src/host.hpp +++ b/src/host.hpp @@ -11,7 +11,7 @@ class host { unsigned int connections; unsigned int connectionsMax; - std::queue runners; + std::queue runners; public: host(const std::string &ip, unsigned int connectionsMax); diff --git a/src/main.cpp b/src/main.cpp index 8c26c14..8e496ad 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,10 +1,10 @@ #include "network.hpp" #include "program.hpp" #include "runner.hpp" +#include #include #include #include -#include #include #include #include @@ -22,7 +22,6 @@ sk::runner_backend detect_backend() { } int status = 0; waitpid(pid, &status, 0); - std::cout << status << std::endl; if (WIFEXITED(status) && WEXITSTATUS(status) == 0) { std::cerr << "Using Docker" << std::endl; return sk::runner_backend::Docker; @@ -30,6 +29,8 @@ sk::runner_backend detect_backend() { return sk::runner_backend::BubbleWrap; } +sk::runner *global_runner = nullptr; + int main(int argc, char **argv) { int opt; std::optional selected_backend; @@ -48,6 +49,15 @@ int main(int argc, char **argv) { } sk::runner runner(selected_backend.has_value() ? selected_backend.value() : detect_backend()); + global_runner = &runner; + struct sigaction action {}; + action.sa_handler = [](int) { + if (global_runner) { + global_runner->exit_active_jobs(); + } + }; + sigaction(SIGINT, &action, nullptr); + sigaction(SIGTERM, &action, nullptr); if (optind < argc) { std::ifstream t(argv[optind]); @@ -58,8 +68,9 @@ int main(int argc, char **argv) { std::stringstream buffer; buffer << t.rdbuf(); std::string code = buffer.str(); - sk::program program{code, "ghcr.io/moshell-lang/moshell"}; + sk::program program{"sample-code", code, "ghcr.io/moshell-lang/moshell:master"}; sk::run_result result = runner.run_blocking(program); + std::cout << "exited with: " << result.exit_code << "\n"; std::cout << "out: " << result.out << "\n"; std::cout << "err: " << result.err << "\n"; return 0; @@ -79,7 +90,7 @@ int main(int argc, char **argv) { break; } - std::string_view jobId(static_cast(request.data()), JOB_ID_LEN); + std::string jobId(static_cast(request.data()), JOB_ID_LEN); uint32_t imageLen = sk::read_uint32(static_cast(request.data()) + JOB_ID_LEN); uint32_t codeLen = sk::read_uint32(static_cast(request.data()) + JOB_ID_LEN + sizeof(uint32_t)); @@ -91,15 +102,16 @@ int main(int argc, char **argv) { std::string requestString(static_cast(request.data()) + MIN_MESSAGE_LEN + imageLen, codeLen); std::cout << "Executing " << codeLen << " bytes code.\n"; - sk::program program{requestString, imageString}; + sk::program program{jobId, requestString, imageString}; sk::run_result result = runner.run_blocking(program); std::cout << "Result: " << result.out << std::endl; - // Send the job id and result.out to sink - zmq::message_t reply(JOB_ID_LEN + result.out.size()); + // Send the job id, the exit code and result.out to sink + zmq::message_t reply(JOB_ID_LEN + sizeof(uint32_t) + result.out.size()); memcpy(reply.data(), jobId.data(), JOB_ID_LEN); - memcpy(static_cast(reply.data()) + JOB_ID_LEN, result.out.data(), result.out.size()); + sk::write_uint32(static_cast(reply.data()) + JOB_ID_LEN, result.exit_code); + memcpy(static_cast(reply.data()) + JOB_ID_LEN + sizeof(uint32_t), result.out.data(), result.out.size()); sender.send(reply, zmq::send_flags::none); } return 0; diff --git a/src/network.cpp b/src/network.cpp index d5d3a0c..d567639 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -2,4 +2,10 @@ namespace sk { uint32_t read_uint32(const char *buffer) { return static_cast(buffer[3]) | static_cast(buffer[2]) << 8 | static_cast(buffer[1]) << 16 | static_cast(buffer[0]) << 24; } +void write_uint32(char *buffer, uint32_t value) { + buffer[0] = static_cast(value >> 24); + buffer[1] = static_cast(value >> 16); + buffer[2] = static_cast(value >> 8); + buffer[3] = static_cast(value); +} } diff --git a/src/network.hpp b/src/network.hpp index db7ab6c..a577cef 100644 --- a/src/network.hpp +++ b/src/network.hpp @@ -4,4 +4,5 @@ namespace sk { uint32_t read_uint32(const char *buffer); +void write_uint32(char *buffer, uint32_t value); } diff --git a/src/program.hpp b/src/program.hpp index a74a616..7779324 100644 --- a/src/program.hpp +++ b/src/program.hpp @@ -5,6 +5,7 @@ namespace sk { struct program { + std::string name; std::string code; std::string image; }; diff --git a/src/runner.cpp b/src/runner.cpp index 28a55d2..c1adef6 100644 --- a/src/runner.cpp +++ b/src/runner.cpp @@ -1,18 +1,30 @@ #include "runner.hpp" +#include #include #include #include #include #include +#include #include #include #include +static constexpr int TIMEOUT_SECONDS = 2; + +// Define a helper to throw a system error if a syscall fails +static auto ensure = [](int res) -> void { + if (res == -1) { + throw std::system_error{errno, std::generic_category()}; + } +}; + namespace sk { runner::runner(runner_backend backend) : backend{backend} {} run_result runner::run_blocking(const program &program) { + // Open file descriptors ahead of time int in_pipe[2]; int out_pipe[2]; int err_pipe[2]; @@ -20,6 +32,17 @@ run_result runner::run_blocking(const program &program) { throw std::system_error{errno, std::generic_category()}; } + // Create a timer that will be polled when the program runs too long + int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC); + if (timerfd == -1) { + throw std::system_error{errno, std::generic_category()}; + } + itimerspec timer{}; + timer.it_value.tv_sec = TIMEOUT_SECONDS; + if (timerfd_settime(timerfd, 0, &timer, nullptr) == -1) { + throw std::system_error{errno, std::generic_category()}; + } + // Avoid blocking on stdin, and being interrupted if the pipe is closed prematurely int flags = fcntl(in_pipe[1], F_GETFL, 0); fcntl(in_pipe[1], F_SETFL, flags | O_NONBLOCK); @@ -29,19 +52,21 @@ run_result runner::run_blocking(const program &program) { posix_spawn_file_actions_addclose(&actions, in_pipe[1]); posix_spawn_file_actions_addclose(&actions, out_pipe[0]); posix_spawn_file_actions_addclose(&actions, err_pipe[0]); + posix_spawn_file_actions_addclose(&actions, timerfd); posix_spawn_file_actions_adddup2(&actions, in_pipe[0], STDIN_FILENO); posix_spawn_file_actions_adddup2(&actions, out_pipe[1], STDOUT_FILENO); posix_spawn_file_actions_adddup2(&actions, err_pipe[1], STDERR_FILENO); posix_spawn_file_actions_addclose(&actions, in_pipe[0]); posix_spawn_file_actions_addclose(&actions, out_pipe[1]); posix_spawn_file_actions_addclose(&actions, err_pipe[1]); - const char *const docker_args[] = {"docker", "run", "--rm", "-i", "--pull=never", "--cap-drop=ALL", "--network=none", "--memory=64m", "--memory-swap=64m", "--pids-limit=128", program.image.c_str(), nullptr}; - const char *const bwrap_args[] = {"bwrap", "--ro-bind", "/usr", "/usr", "--dir", "/tmp", "--dir", "/var", "--proc", "/proc", "--dev", "/dev", "--symlink", "usr/lib", "/lib", "--symlink", "usr/lib64", "/lib64", "--symlink", "usr/bin", "/bin", "--symlink", "usr/sbin", "/sbin", "--unshare-all", "/bin/sh", nullptr}; + const char *const docker_args[] = {"docker", "run", "--rm", "-i", "--name", program.name.c_str(), "--pull=never", "--cap-drop=ALL", "--network=none", "--memory=64m", "--memory-swap=64m", "--pids-limit=128", program.image.c_str(), nullptr}; + const char *const bwrap_args[] = {"bwrap", "--ro-bind", "/usr", "/usr", "--dir", "/tmp", "--dir", "/var", "--proc", "/proc", "--dev", "/dev", "--symlink", "usr/lib", "/lib", "--symlink", "usr/lib64", "/lib64", "--symlink", "usr/bin", "/bin", "--symlink", "usr/sbin", "/sbin", "--unshare-all", "--die-with-parent", "/bin/sh", nullptr}; const char *const *args = docker_args; if (backend == runner_backend::BubbleWrap) { args = bwrap_args; } pid_t pid; + bool killed = false; int exit_code; if (posix_spawnp(&pid, args[0], &actions, nullptr, const_cast(args), nullptr) != 0) { throw std::system_error{errno, std::generic_category()}; @@ -51,6 +76,12 @@ run_result runner::run_blocking(const program &program) { close(out_pipe[1]); close(err_pipe[1]); + // Register the job as active + { + std::lock_guard guard(active_jobs_mutex); + active_jobs.push_back(active_job{program.name, pid}); + } + size_t len = program.code.size(); size_t window = 0; const char *data = program.code.data(); @@ -58,7 +89,7 @@ run_result runner::run_blocking(const program &program) { std::array buffer{}; std::string out; std::string err; - std::array plist = {pollfd{in_pipe[1], POLLOUT | POLLHUP, 0}, pollfd{out_pipe[0], POLLIN, 0}, pollfd{err_pipe[0], POLLIN, 0}}; + std::array plist = {pollfd{in_pipe[1], POLLOUT | POLLHUP, 0}, pollfd{out_pipe[0], POLLIN, 0}, pollfd{err_pipe[0], POLLIN, 0}, pollfd{timerfd, POLLIN, 0}}; pollfd *pfds = plist.data(); nfds_t nfds = plist.size(); @@ -94,8 +125,8 @@ run_result runner::run_blocking(const program &program) { } } - // Poll stdout and stderr - for (nfds_t i = nfds - 2; i < nfds; ++i) { + // Poll stdout, stderr and the timer + for (nfds_t i = nfds - 3; i < nfds; ++i) { if (pfds[i].revents & POLLIN) { ssize_t bytes_read = read(pfds[i].fd, buffer.data(), buffer.size()); if (bytes_read == -1) { @@ -103,6 +134,14 @@ run_result runner::run_blocking(const program &program) { } if (pfds[i].fd == out_pipe[0]) { out.append(buffer.data(), bytes_read); + } else if (pfds[i].fd == timerfd) { + std::lock_guard guard(active_jobs_mutex); + auto it = std::find_if(active_jobs.begin(), active_jobs.end(), [pid](const active_job &job) { return job.pid == pid; }); + if (it != active_jobs.end()) { + exit(*it); + active_jobs.erase(it); + } + killed = true; } else { err.append(buffer.data(), bytes_read); } @@ -119,8 +158,36 @@ run_result runner::run_blocking(const program &program) { waitpid(pid, &exit_code, 0); close(out_pipe[0]); close(err_pipe[0]); + close(timerfd); + + // Remove the job from the active list + { + std::lock_guard guard(active_jobs_mutex); + auto it = std::find_if(active_jobs.begin(), active_jobs.end(), [pid](const active_job &job) { return job.pid == pid; }); + if (it != active_jobs.end()) { + active_jobs.erase(it); + } + } posix_spawn_file_actions_destroy(&actions); - return run_result{out, err}; + return run_result{out, err, killed ? 124 : exit_code}; +} + +void runner::exit_active_jobs() { + std::lock_guard guard(active_jobs_mutex); + for (const auto &job : active_jobs) { + exit(job); + } + active_jobs.clear(); +} + +void runner::exit(const active_job &job) { + if (backend == runner_backend::Docker) { + const char *const kill_args[] = {"docker", "kill", job.job_id.c_str(), nullptr}; + pid_t kill_pid; + ensure(posix_spawnp(&kill_pid, kill_args[0], nullptr, nullptr, const_cast(kill_args), nullptr)); + } else { + ensure(kill(job.pid, SIGINT)); + } } } diff --git a/src/runner.hpp b/src/runner.hpp index af5d732..89a3e8d 100644 --- a/src/runner.hpp +++ b/src/runner.hpp @@ -1,21 +1,35 @@ #pragma once #include "program.hpp" +#include #include +#include namespace sk { struct [[nodiscard]] run_result { std::string out; std::string err; + int exit_code; +}; + +struct [[nodiscard]] active_job { + std::string job_id; + pid_t pid; }; enum class runner_backend { BubbleWrap, Docker }; class runner { runner_backend backend; + std::vector active_jobs; + std::mutex active_jobs_mutex; public: explicit runner(runner_backend backend); run_result run_blocking(const program &program); + void exit_active_jobs(); + + private: + void exit(const active_job &job); }; }