From 26504eded573b6e73dde40a7572db458315c553d Mon Sep 17 00:00:00 2001 From: clfreville2 Date: Fri, 10 Nov 2023 08:18:27 +0100 Subject: [PATCH 1/2] Also poll stdin --- src/runner.cpp | 77 ++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 59 insertions(+), 18 deletions(-) diff --git a/src/runner.cpp b/src/runner.cpp index f473c0c..dbece25 100644 --- a/src/runner.cpp +++ b/src/runner.cpp @@ -24,9 +24,9 @@ run_result runner::run_blocking(const program &program) { posix_spawn_file_actions_addclose(&actions, in_pipe[1]); posix_spawn_file_actions_addclose(&actions, out_pipe[0]); posix_spawn_file_actions_addclose(&actions, err_pipe[0]); - posix_spawn_file_actions_adddup2(&actions, in_pipe[0], 0); - posix_spawn_file_actions_adddup2(&actions, out_pipe[1], 1); - posix_spawn_file_actions_adddup2(&actions, err_pipe[1], 2); + posix_spawn_file_actions_adddup2(&actions, in_pipe[0], STDIN_FILENO); + posix_spawn_file_actions_adddup2(&actions, out_pipe[1], STDOUT_FILENO); + posix_spawn_file_actions_adddup2(&actions, err_pipe[1], STDERR_FILENO); posix_spawn_file_actions_addclose(&actions, in_pipe[0]); posix_spawn_file_actions_addclose(&actions, out_pipe[1]); posix_spawn_file_actions_addclose(&actions, err_pipe[1]); @@ -46,30 +46,71 @@ run_result runner::run_blocking(const program &program) { close(out_pipe[1]); close(err_pipe[1]); - write(in_pipe[1], program.code.data(), program.code.size()); - close(in_pipe[1]); + size_t len = program.code.size(); + size_t window = 0; + const char *data = program.code.data(); std::array buffer{}; std::string out; std::string err; - std::array plist = {pollfd{out_pipe[0], POLLIN, 0}, pollfd{err_pipe[0], POLLIN, 0}}; - while (poll(plist.data(), plist.size(), /*timeout*/ -1) > 0) { - if (plist[0].revents & POLLIN) { - ssize_t bytes_read = read(out_pipe[0], buffer.data(), buffer.size()); - if (bytes_read == -1) { - throw std::system_error{errno, std::generic_category()}; + std::array plist = {pollfd{in_pipe[1], POLLOUT | POLLHUP, 0}, pollfd{out_pipe[0], POLLIN, 0}, pollfd{err_pipe[0], POLLIN, 0}}; + pollfd *pfds = plist.data(); + nfds_t nfds = plist.size(); + + int res; + while ((res = poll(pfds, nfds, -1)) > 0 || (res == -1 && errno == EINTR)) { + if (res == -1) { + // Interrupted by a signal, retry + continue; + } + + nfds_t polled_fds = 0; + if (nfds == plist.size()) { + // Poll stdin when we still have data to write + if (pfds[0].revents & (POLLHUP | POLLERR)) { + // Closing input prematurely + close(in_pipe[1]); + --nfds; + ++pfds; + ++polled_fds; + } else if (pfds[0].revents & POLLOUT) { + if (window >= len) { + // End input + close(in_pipe[1]); + --nfds; + ++pfds; + } else { + // Write the current data window + size_t l = std::min(len - window, buffer.size()); + write(pfds[0].fd, data + window, l); + window += l; + } + ++polled_fds; } - out.append(buffer.data(), bytes_read); - } else if (plist[1].revents & POLLIN) { - ssize_t bytes_read = read(err_pipe[0], buffer.data(), buffer.size()); - if (bytes_read == -1) { - throw std::system_error{errno, std::generic_category()}; + } + + // Poll stdout and stderr + for (nfds_t i = nfds - 2; i < nfds; ++i) { + if (pfds[i].revents & POLLIN) { + ssize_t bytes_read = read(pfds[i].fd, buffer.data(), buffer.size()); + if (bytes_read == -1) { + throw std::system_error{errno, std::generic_category()}; + } + if (pfds[i].fd == out_pipe[0]) { + out.append(buffer.data(), bytes_read); + } else { + err.append(buffer.data(), bytes_read); + } + ++polled_fds; } - err.append(buffer.data(), bytes_read); - } else { + } + + // If nothing was polled, we're done + if (polled_fds == 0) { break; } } + waitpid(pid, &exit_code, 0); close(out_pipe[0]); close(err_pipe[0]); From e5598e69de44699bf761ddbf12af565f39a1b6f5 Mon Sep 17 00:00:00 2001 From: clfreville2 Date: Fri, 10 Nov 2023 08:22:23 +0100 Subject: [PATCH 2/2] Test the runner via the command line The executable now takes an optional backend and a script to run. This greatly simplify testing without needing the entire server. --- src/main.cpp | 45 ++++++++++++++++++++++++++++++++------------- 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index 7268c51..8c26c14 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,17 +1,14 @@ -#include "config.hpp" #include "network.hpp" #include "program.hpp" #include "runner.hpp" #include #include -#include #include #include #include -#include +#include #include -#include "host.hpp" #include "zmq_addon.hpp" static constexpr uint32_t JOB_ID_LEN = 32; @@ -33,19 +30,41 @@ sk::runner_backend detect_backend() { return sk::runner_backend::BubbleWrap; } -int main() { - std::vector listsHosts = sk::config::loadHostsFromToml("../conf.toml"); - std::priority_queue hosts(listsHosts.begin(), listsHosts.end()); +int main(int argc, char **argv) { + int opt; + std::optional selected_backend; + while ((opt = getopt(argc, argv, "bc")) != -1) { + switch (opt) { + case 'b': + selected_backend = sk::runner_backend::BubbleWrap; + break; + case 'c': + selected_backend = sk::runner_backend::Docker; + break; + default: + std::cerr << "Usage: %s [-bc] [script]\n"; + return 1; + } + } - std::queue programQueue; + sk::runner runner(selected_backend.has_value() ? selected_backend.value() : detect_backend()); - if (hosts.empty()) { - std::cerr << "Pas de host" << std::endl; - exit(1); + if (optind < argc) { + std::ifstream t(argv[optind]); + if (!t) { + std::cerr << "Unable to open file " << argv[optind] << std::endl; + return 1; + } + std::stringstream buffer; + buffer << t.rdbuf(); + std::string code = buffer.str(); + sk::program program{code, "ghcr.io/moshell-lang/moshell"}; + sk::run_result result = runner.run_blocking(program); + std::cout << "out: " << result.out << "\n"; + std::cout << "err: " << result.err << "\n"; + return 0; } - sk::runner runner(detect_backend()); - zmq::context_t context(1); zmq::socket_t receiver(context, zmq::socket_type::pull); receiver.connect("tcp://localhost:5557");