Merge pull request 'Use poll(2) for stdin' (#2) from sys/poll-stdin into master
continuous-integration/drone/push Build is passing Details

Reviewed-on: #2
pull/3/head
Clément FRÉVILLE 2 years ago
commit 60b4fce341

@ -1,17 +1,14 @@
#include "config.hpp"
#include "network.hpp"
#include "program.hpp"
#include "runner.hpp"
#include <filesystem>
#include <iostream>
#include <queue>
#include <spawn.h>
#include <string_view>
#include <toml++/toml.h>
#include <vector>
#include <unistd.h>
#include <wait.h>
#include "host.hpp"
#include "zmq_addon.hpp"
static constexpr uint32_t JOB_ID_LEN = 32;
@ -33,18 +30,40 @@ sk::runner_backend detect_backend() {
return sk::runner_backend::BubbleWrap;
}
int main() {
std::vector<sk::host> listsHosts = sk::config::loadHostsFromToml("../conf.toml");
std::priority_queue<sk::host> hosts(listsHosts.begin(), listsHosts.end());
int main(int argc, char **argv) {
int opt;
std::optional<sk::runner_backend> selected_backend;
while ((opt = getopt(argc, argv, "bc")) != -1) {
switch (opt) {
case 'b':
selected_backend = sk::runner_backend::BubbleWrap;
break;
case 'c':
selected_backend = sk::runner_backend::Docker;
break;
default:
std::cerr << "Usage: %s [-bc] [script]\n";
return 1;
}
}
std::queue<sk::program> programQueue;
sk::runner runner(selected_backend.has_value() ? selected_backend.value() : detect_backend());
if (hosts.empty()) {
std::cerr << "Pas de host" << std::endl;
exit(1);
if (optind < argc) {
std::ifstream t(argv[optind]);
if (!t) {
std::cerr << "Unable to open file " << argv[optind] << std::endl;
return 1;
}
std::stringstream buffer;
buffer << t.rdbuf();
std::string code = buffer.str();
sk::program program{code, "ghcr.io/moshell-lang/moshell"};
sk::run_result result = runner.run_blocking(program);
std::cout << "out: " << result.out << "\n";
std::cout << "err: " << result.err << "\n";
return 0;
}
sk::runner runner(detect_backend());
zmq::context_t context(1);
zmq::socket_t receiver(context, zmq::socket_type::pull);

@ -24,9 +24,9 @@ run_result runner::run_blocking(const program &program) {
posix_spawn_file_actions_addclose(&actions, in_pipe[1]);
posix_spawn_file_actions_addclose(&actions, out_pipe[0]);
posix_spawn_file_actions_addclose(&actions, err_pipe[0]);
posix_spawn_file_actions_adddup2(&actions, in_pipe[0], 0);
posix_spawn_file_actions_adddup2(&actions, out_pipe[1], 1);
posix_spawn_file_actions_adddup2(&actions, err_pipe[1], 2);
posix_spawn_file_actions_adddup2(&actions, in_pipe[0], STDIN_FILENO);
posix_spawn_file_actions_adddup2(&actions, out_pipe[1], STDOUT_FILENO);
posix_spawn_file_actions_adddup2(&actions, err_pipe[1], STDERR_FILENO);
posix_spawn_file_actions_addclose(&actions, in_pipe[0]);
posix_spawn_file_actions_addclose(&actions, out_pipe[1]);
posix_spawn_file_actions_addclose(&actions, err_pipe[1]);
@ -46,30 +46,71 @@ run_result runner::run_blocking(const program &program) {
close(out_pipe[1]);
close(err_pipe[1]);
write(in_pipe[1], program.code.data(), program.code.size());
close(in_pipe[1]);
size_t len = program.code.size();
size_t window = 0;
const char *data = program.code.data();
std::array<char, 1024> buffer{};
std::string out;
std::string err;
std::array<pollfd, 2> plist = {pollfd{out_pipe[0], POLLIN, 0}, pollfd{err_pipe[0], POLLIN, 0}};
while (poll(plist.data(), plist.size(), /*timeout*/ -1) > 0) {
if (plist[0].revents & POLLIN) {
ssize_t bytes_read = read(out_pipe[0], buffer.data(), buffer.size());
if (bytes_read == -1) {
throw std::system_error{errno, std::generic_category()};
std::array<pollfd, 3> plist = {pollfd{in_pipe[1], POLLOUT | POLLHUP, 0}, pollfd{out_pipe[0], POLLIN, 0}, pollfd{err_pipe[0], POLLIN, 0}};
pollfd *pfds = plist.data();
nfds_t nfds = plist.size();
int res;
while ((res = poll(pfds, nfds, -1)) > 0 || (res == -1 && errno == EINTR)) {
if (res == -1) {
// Interrupted by a signal, retry
continue;
}
out.append(buffer.data(), bytes_read);
} else if (plist[1].revents & POLLIN) {
ssize_t bytes_read = read(err_pipe[0], buffer.data(), buffer.size());
nfds_t polled_fds = 0;
if (nfds == plist.size()) {
// Poll stdin when we still have data to write
if (pfds[0].revents & (POLLHUP | POLLERR)) {
// Closing input prematurely
close(in_pipe[1]);
--nfds;
++pfds;
++polled_fds;
} else if (pfds[0].revents & POLLOUT) {
if (window >= len) {
// End input
close(in_pipe[1]);
--nfds;
++pfds;
} else {
// Write the current data window
size_t l = std::min(len - window, buffer.size());
write(pfds[0].fd, data + window, l);
window += l;
}
++polled_fds;
}
}
// Poll stdout and stderr
for (nfds_t i = nfds - 2; i < nfds; ++i) {
if (pfds[i].revents & POLLIN) {
ssize_t bytes_read = read(pfds[i].fd, buffer.data(), buffer.size());
if (bytes_read == -1) {
throw std::system_error{errno, std::generic_category()};
}
err.append(buffer.data(), bytes_read);
if (pfds[i].fd == out_pipe[0]) {
out.append(buffer.data(), bytes_read);
} else {
err.append(buffer.data(), bytes_read);
}
++polled_fds;
}
}
// If nothing was polled, we're done
if (polled_fds == 0) {
break;
}
}
waitpid(pid, &exit_code, 0);
close(out_pipe[0]);
close(err_pipe[0]);

Loading…
Cancel
Save