You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

106 lines
3.7 KiB

#include "network.hpp"
#include "program.hpp"
#include "runner.hpp"
#include <filesystem>
#include <iostream>
#include <spawn.h>
#include <toml++/toml.h>
#include <unistd.h>
#include <wait.h>
#include "zmq_addon.hpp"
static constexpr uint32_t JOB_ID_LEN = 32;
static constexpr uint32_t MIN_MESSAGE_LEN = JOB_ID_LEN + sizeof(uint32_t) * 2;
sk::runner_backend detect_backend() {
const char *const argv[] = {"docker", "stats", "--no-stream", nullptr};
pid_t pid;
if (posix_spawnp(&pid, argv[0], nullptr, nullptr, const_cast<char *const *>(argv), nullptr) != 0) {
return sk::runner_backend::BubbleWrap;
}
int status = 0;
waitpid(pid, &status, 0);
if (WIFEXITED(status) && WEXITSTATUS(status) == 0) {
std::cerr << "Using Docker" << std::endl;
return sk::runner_backend::Docker;
}
return sk::runner_backend::BubbleWrap;
}
int main(int argc, char **argv) {
int opt;
std::optional<sk::runner_backend> selected_backend;
while ((opt = getopt(argc, argv, "bc")) != -1) {
switch (opt) {
case 'b':
selected_backend = sk::runner_backend::BubbleWrap;
break;
case 'c':
selected_backend = sk::runner_backend::Docker;
break;
default:
std::cerr << "Usage: %s [-bc] [script]\n";
return 1;
}
}
sk::runner runner(selected_backend.has_value() ? selected_backend.value() : detect_backend());
if (optind < argc) {
std::ifstream t(argv[optind]);
if (!t) {
std::cerr << "Unable to open file " << argv[optind] << std::endl;
return 1;
}
std::stringstream buffer;
buffer << t.rdbuf();
std::string code = buffer.str();
sk::program program{"sample-code", code, "ghcr.io/moshell-lang/moshell:master"};
sk::run_result result = runner.run_blocking(program);
std::cout << "exited with: " << result.exit_code << "\n";
std::cout << "out: " << result.out << "\n";
std::cout << "err: " << result.err << "\n";
return 0;
}
zmq::context_t context(1);
zmq::socket_t receiver(context, zmq::socket_type::pull);
receiver.connect("tcp://localhost:5557");
zmq::socket_t sender(context, zmq::socket_type::push);
sender.connect("tcp://localhost:5558");
while (true) {
zmq::message_t request;
receiver.recv(request);
if (request.size() < MIN_MESSAGE_LEN) {
std::cerr << "Invalid request" << std::endl;
break;
}
std::string jobId(static_cast<char *>(request.data()), JOB_ID_LEN);
uint32_t imageLen = sk::read_uint32(static_cast<char *>(request.data()) + JOB_ID_LEN);
uint32_t codeLen = sk::read_uint32(static_cast<char *>(request.data()) + JOB_ID_LEN + sizeof(uint32_t));
if (request.size() < MIN_MESSAGE_LEN + imageLen + codeLen) {
std::cerr << "Invalid request" << std::endl;
break;
}
std::string imageString(static_cast<char *>(request.data()) + MIN_MESSAGE_LEN, imageLen);
std::string requestString(static_cast<char *>(request.data()) + MIN_MESSAGE_LEN + imageLen, codeLen);
std::cout << "Executing " << codeLen << " bytes code.\n";
sk::program program{jobId, requestString, imageString};
sk::run_result result = runner.run_blocking(program);
std::cout << "Result: " << result.out << std::endl;
// Send the job id and result.out to sink
zmq::message_t reply(JOB_ID_LEN + result.out.size());
memcpy(reply.data(), jobId.data(), JOB_ID_LEN);
memcpy(static_cast<char *>(reply.data()) + JOB_ID_LEN, result.out.data(), result.out.size());
sender.send(reply, zmq::send_flags::none);
}
return 0;
}