Use poll(2) for stdin #2

Merged
clement.freville2 merged 2 commits from sys/poll-stdin into master 2 years ago

@ -1,17 +1,14 @@
#include "config.hpp"
#include "network.hpp" #include "network.hpp"
#include "program.hpp" #include "program.hpp"
#include "runner.hpp" #include "runner.hpp"
#include <filesystem> #include <filesystem>
#include <iostream> #include <iostream>
#include <queue>
#include <spawn.h> #include <spawn.h>
#include <string_view> #include <string_view>
#include <toml++/toml.h> #include <toml++/toml.h>
#include <vector> #include <unistd.h>
#include <wait.h> #include <wait.h>
#include "host.hpp"
#include "zmq_addon.hpp" #include "zmq_addon.hpp"
static constexpr uint32_t JOB_ID_LEN = 32; static constexpr uint32_t JOB_ID_LEN = 32;
@ -33,18 +30,40 @@ sk::runner_backend detect_backend() {
return sk::runner_backend::BubbleWrap; return sk::runner_backend::BubbleWrap;
} }
int main() { int main(int argc, char **argv) {
std::vector<sk::host> listsHosts = sk::config::loadHostsFromToml("../conf.toml"); int opt;
std::priority_queue<sk::host> hosts(listsHosts.begin(), listsHosts.end()); std::optional<sk::runner_backend> selected_backend;
while ((opt = getopt(argc, argv, "bc")) != -1) {
switch (opt) {
case 'b':
selected_backend = sk::runner_backend::BubbleWrap;
break;
case 'c':
selected_backend = sk::runner_backend::Docker;
break;
default:
std::cerr << "Usage: %s [-bc] [script]\n";
return 1;
}
}
std::queue<sk::program> programQueue; sk::runner runner(selected_backend.has_value() ? selected_backend.value() : detect_backend());
if (hosts.empty()) { if (optind < argc) {
std::cerr << "Pas de host" << std::endl; std::ifstream t(argv[optind]);
exit(1); if (!t) {
std::cerr << "Unable to open file " << argv[optind] << std::endl;
return 1;
}
std::stringstream buffer;
buffer << t.rdbuf();
std::string code = buffer.str();
sk::program program{code, "ghcr.io/moshell-lang/moshell"};
sk::run_result result = runner.run_blocking(program);
std::cout << "out: " << result.out << "\n";
std::cout << "err: " << result.err << "\n";
return 0;
} }
sk::runner runner(detect_backend());
zmq::context_t context(1); zmq::context_t context(1);
zmq::socket_t receiver(context, zmq::socket_type::pull); zmq::socket_t receiver(context, zmq::socket_type::pull);

@ -24,9 +24,9 @@ run_result runner::run_blocking(const program &program) {
posix_spawn_file_actions_addclose(&actions, in_pipe[1]); posix_spawn_file_actions_addclose(&actions, in_pipe[1]);
posix_spawn_file_actions_addclose(&actions, out_pipe[0]); posix_spawn_file_actions_addclose(&actions, out_pipe[0]);
posix_spawn_file_actions_addclose(&actions, err_pipe[0]); posix_spawn_file_actions_addclose(&actions, err_pipe[0]);
posix_spawn_file_actions_adddup2(&actions, in_pipe[0], 0); posix_spawn_file_actions_adddup2(&actions, in_pipe[0], STDIN_FILENO);
posix_spawn_file_actions_adddup2(&actions, out_pipe[1], 1); posix_spawn_file_actions_adddup2(&actions, out_pipe[1], STDOUT_FILENO);
posix_spawn_file_actions_adddup2(&actions, err_pipe[1], 2); posix_spawn_file_actions_adddup2(&actions, err_pipe[1], STDERR_FILENO);
posix_spawn_file_actions_addclose(&actions, in_pipe[0]); posix_spawn_file_actions_addclose(&actions, in_pipe[0]);
posix_spawn_file_actions_addclose(&actions, out_pipe[1]); posix_spawn_file_actions_addclose(&actions, out_pipe[1]);
posix_spawn_file_actions_addclose(&actions, err_pipe[1]); posix_spawn_file_actions_addclose(&actions, err_pipe[1]);
@ -46,30 +46,71 @@ run_result runner::run_blocking(const program &program) {
close(out_pipe[1]); close(out_pipe[1]);
close(err_pipe[1]); close(err_pipe[1]);
write(in_pipe[1], program.code.data(), program.code.size()); size_t len = program.code.size();
close(in_pipe[1]); size_t window = 0;
const char *data = program.code.data();
std::array<char, 1024> buffer{}; std::array<char, 1024> buffer{};
std::string out; std::string out;
std::string err; std::string err;
std::array<pollfd, 2> plist = {pollfd{out_pipe[0], POLLIN, 0}, pollfd{err_pipe[0], POLLIN, 0}}; std::array<pollfd, 3> plist = {pollfd{in_pipe[1], POLLOUT | POLLHUP, 0}, pollfd{out_pipe[0], POLLIN, 0}, pollfd{err_pipe[0], POLLIN, 0}};
while (poll(plist.data(), plist.size(), /*timeout*/ -1) > 0) { pollfd *pfds = plist.data();
if (plist[0].revents & POLLIN) { nfds_t nfds = plist.size();
ssize_t bytes_read = read(out_pipe[0], buffer.data(), buffer.size());
if (bytes_read == -1) { int res;
throw std::system_error{errno, std::generic_category()}; while ((res = poll(pfds, nfds, -1)) > 0 || (res == -1 && errno == EINTR)) {
if (res == -1) {
// Interrupted by a signal, retry
continue;
} }
out.append(buffer.data(), bytes_read);
} else if (plist[1].revents & POLLIN) { nfds_t polled_fds = 0;
ssize_t bytes_read = read(err_pipe[0], buffer.data(), buffer.size()); if (nfds == plist.size()) {
// Poll stdin when we still have data to write
if (pfds[0].revents & (POLLHUP | POLLERR)) {
// Closing input prematurely
close(in_pipe[1]);
--nfds;
++pfds;
++polled_fds;
} else if (pfds[0].revents & POLLOUT) {
if (window >= len) {
// End input
close(in_pipe[1]);
--nfds;
++pfds;
} else {
// Write the current data window
size_t l = std::min(len - window, buffer.size());
write(pfds[0].fd, data + window, l);
window += l;
}
++polled_fds;
}
}
// Poll stdout and stderr
for (nfds_t i = nfds - 2; i < nfds; ++i) {
if (pfds[i].revents & POLLIN) {
ssize_t bytes_read = read(pfds[i].fd, buffer.data(), buffer.size());
if (bytes_read == -1) { if (bytes_read == -1) {
throw std::system_error{errno, std::generic_category()}; throw std::system_error{errno, std::generic_category()};
} }
err.append(buffer.data(), bytes_read); if (pfds[i].fd == out_pipe[0]) {
out.append(buffer.data(), bytes_read);
} else { } else {
err.append(buffer.data(), bytes_read);
}
++polled_fds;
}
}
// If nothing was polled, we're done
if (polled_fds == 0) {
break; break;
} }
} }
waitpid(pid, &exit_code, 0); waitpid(pid, &exit_code, 0);
close(out_pipe[0]); close(out_pipe[0]);
close(err_pipe[0]); close(err_pipe[0]);

Loading…
Cancel
Save