#include "network.hpp" #include "program.hpp" #include "runner.hpp" #include #include #include #include #include #include #include #include "config.hpp" #include "zmq_addon.hpp" static constexpr uint32_t JOB_ID_LEN = 32; static constexpr uint32_t MIN_MESSAGE_LEN = JOB_ID_LEN + sizeof(uint32_t) * 2; sk::runner_backend detect_backend() { const char *const argv[] = {"docker", "stats", "--no-stream", nullptr}; pid_t pid; if (posix_spawnp(&pid, argv[0], nullptr, nullptr, const_cast(argv), nullptr) != 0) { return sk::runner_backend::BubbleWrap; } int status = 0; waitpid(pid, &status, 0); if (WIFEXITED(status) && WEXITSTATUS(status) == 0) { std::cerr << "Using Docker" << std::endl; return sk::runner_backend::Docker; } return sk::runner_backend::BubbleWrap; } sk::runner *global_runner = nullptr; int main(int argc, char **argv) { int opt; std::optional selected_backend; std::filesystem::path config_path("config.toml"); bool may_use_default_config = true; while ((opt = getopt(argc, argv, "bc:d")) != -1) { switch (opt) { case 'b': selected_backend = sk::runner_backend::BubbleWrap; break; case 'c': config_path = optarg; may_use_default_config = false; break; case 'd': selected_backend = sk::runner_backend::Docker; break; default: std::cerr << "Usage: %s [-bc] [script]\n"; return 1; } } sk::config config = sk::config::read_or_default(config_path, !may_use_default_config); sk::runner runner(selected_backend.has_value() ? selected_backend.value() : detect_backend(), config.runner); global_runner = &runner; struct sigaction action {}; action.sa_handler = [](int) { if (global_runner) { global_runner->exit_active_jobs(); } }; sigaction(SIGINT, &action, nullptr); sigaction(SIGTERM, &action, nullptr); if (optind < argc) { std::ifstream t(argv[optind]); if (!t) { std::cerr << "Unable to open file " << argv[optind] << std::endl; return 1; } std::stringstream buffer; buffer << t.rdbuf(); std::string code = buffer.str(); sk::program program{"sample-code", code, "ghcr.io/moshell-lang/moshell:master"}; sk::run_result result = runner.run_blocking(program); std::cout << "exited with: " << result.exit_code << "\n"; std::cout << "out: " << result.out << "\n"; std::cout << "err: " << result.err << "\n"; return 0; } zmq::context_t context(static_cast(config.queue.nb_threads)); zmq::socket_t receiver(context, zmq::socket_type::pull); receiver.connect(config.queue.pull_addr); zmq::socket_t sender(context, zmq::socket_type::push); sender.connect(config.queue.push_addr); while (true) { zmq::message_t request; receiver.recv(request); if (request.size() < MIN_MESSAGE_LEN) { std::cerr << "Invalid request" << std::endl; continue; } std::string jobId(static_cast(request.data()), JOB_ID_LEN); uint32_t imageLen = sk::read_uint32(static_cast(request.data()) + JOB_ID_LEN); uint32_t codeLen = sk::read_uint32(static_cast(request.data()) + JOB_ID_LEN + sizeof(uint32_t)); if (request.size() < MIN_MESSAGE_LEN + imageLen + codeLen) { std::cerr << "Invalid request" << std::endl; continue; } std::string imageString(static_cast(request.data()) + MIN_MESSAGE_LEN, imageLen); std::string requestString(static_cast(request.data()) + MIN_MESSAGE_LEN + imageLen, codeLen); std::cout << "Executing " << codeLen << " bytes code.\n"; sk::program program{jobId, requestString, imageString}; sk::run_result result = runner.run_blocking(program); std::cout << "Result: " << result.out << std::endl; // Send the job id, the exit code and result.out to sink zmq::message_t reply(JOB_ID_LEN + sizeof(uint32_t) + result.out.size()); memcpy(reply.data(), jobId.data(), JOB_ID_LEN); sk::write_uint32(static_cast(reply.data()) + JOB_ID_LEN, result.exit_code); memcpy(static_cast(reply.data()) + JOB_ID_LEN + sizeof(uint32_t), result.out.data(), result.out.size()); sender.send(reply, zmq::send_flags::none); } return 0; }