Rewrite using Fastify
continuous-integration/drone/push Build is passing Details

pull/1/head
Clément FRÉVILLE 1 year ago
parent 801f16c885
commit 5fb98c0395

@ -11,10 +11,11 @@
"tsx": "^4.7.0", "tsx": "^4.7.0",
"typescript": "^5.3.3" "typescript": "^5.3.3"
}, },
"peerDependencies": {
"typescript": "^5.0.0"
},
"dependencies": { "dependencies": {
"@fastify/cors": "^8.5.0",
"@fastify/type-provider-typebox": "^4.0.0",
"@sinclair/typebox": "^0.32.9",
"fastify": "^4.25.2",
"nanoid": "^5.0.2", "nanoid": "^5.0.2",
"zeromq": "6.0.0-beta.17" "zeromq": "6.0.0-beta.17"
} }

@ -0,0 +1,13 @@
export const IMAGES = {
moshell: 'ghcr.io/moshell-lang/moshell:master',
};
export function allocateBuffer(jobId: string, code: string, image: string): Buffer {
const buffer = Buffer.allocUnsafe(jobId.length + image.length + code.length + 8);
buffer.write(jobId, 0);
buffer.writeUInt32BE(image.length, jobId.length);
buffer.writeUInt32BE(code.length, jobId.length + 4);
buffer.write(image, jobId.length + 8);
buffer.write(code, jobId.length + 8 + image.length);
return buffer;
}

@ -1,76 +1,56 @@
import http from 'http'; import cors from '@fastify/cors';
import { Type, TypeBoxTypeProvider } from '@fastify/type-provider-typebox';
import Fastify, { FastifyReply } from 'fastify';
import { nanoid } from 'nanoid'; import { nanoid } from 'nanoid';
import { allocateBuffer, IMAGES } from 'runner';
import { Pull, Push } from 'zeromq'; import { Pull, Push } from 'zeromq';
const host = 'localhost';
const port = 3000;
const sender = new Push(); const sender = new Push();
await sender.bind(`tcp://127.0.0.1:5557`); await sender.bind(`tcp://127.0.0.1:5557`);
const receiver = new Pull(); const receiver = new Pull();
await receiver.bind(`tcp://127.0.0.1:5558`); await receiver.bind(`tcp://127.0.0.1:5558`);
const clients: Record<string, FastifyReply> = {};
const generateId = () => nanoid(32); const generateId = () => nanoid(32);
const clients: Record<string, http.ServerResponse> = {}; const fastify = Fastify({
logger: true,
const CORS = { }).withTypeProvider<TypeBoxTypeProvider>();
'Access-Control-Allow-Methods': '*', await fastify.register(cors, {
'Access-Control-Allow-Headers': '*', origin: process.env.ALLOW_ORIGIN || '*',
'Access-Control-Allow-Origin': process.env.ALLOW_ORIGIN || '*', });
};
const server = http.createServer((req, res) => { fastify.post('/run', {
if (req.method === 'OPTIONS') { schema: {
res.writeHead(200, CORS); body: Type.Object({
res.end(); code: Type.String(),
return; language: Type.String(),
} }),
switch (req.url) { },
case '/run': }, (req, reply) => {
const { code, language } = req.body;
const jobId = generateId(); const jobId = generateId();
const code = 'echo a'; const buffer = allocateBuffer(jobId, code, IMAGES.moshell);
const image = 'ghcr.io/moshell-lang/moshell:master';
const buffer = Buffer.allocUnsafe(jobId.length + image.length + code.length + 8);
buffer.write(jobId, 0);
buffer.writeUInt32BE(image.length, jobId.length);
buffer.writeUInt32BE(code.length, jobId.length + 4);
buffer.write(image, jobId.length + 8);
buffer.write(code, jobId.length + 8 + image.length);
res.writeHead(200, {
'Content-Type': 'text/event-stream',
Connection: 'keep-alive',
'Cache-Control': 'no-cache',
...CORS,
});
sender.send(buffer).then(() => { sender.send(buffer).then(() => {
res.write('event: connected\n'); reply.raw.write('event: connected\n');
res.write(`data: ${jobId}\n`); reply.raw.write(`data: ${jobId}\n`);
res.write('id: 0\n\n'); reply.raw.write('id: 0\n\n');
}); });
req.on('close', () => { clients[jobId] = reply;
res.end('OK');
delete clients[jobId];
});
clients[jobId] = res;
break;
default:
res.writeHead(404, CORS);
res.end('404!');
}
});
server.listen(port, () => {
console.log(`Server is running on http://${host}:${port}`);
}); });
for await (const [buff] of receiver) { async function forwardOutput() {
for await (const [buff] of receiver) {
const jobId = buff.subarray(0, 32).toString(); const jobId = buff.subarray(0, 32).toString();
console.log(`Received ${jobId}`); console.log(`Received ${jobId}`);
const res = clients[jobId]; const reply = clients[jobId];
if (!res) { if (!reply) {
continue; continue;
} }
res.write('event: message\n'); reply.raw.write('event: message\n');
res.write(`data: ${encodeURIComponent(buff.subarray(32).toString())}\n`); reply.raw.write(`data: ${encodeURIComponent(buff.subarray(32).toString())}\n`);
res.write('id: 1\n\n'); reply.raw.write('id: 1\n\n');
}
} }
await Promise.all([fastify.listen({ port: 3000 }), forwardOutput()]);

Loading…
Cancel
Save