You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
sandkasten-web/src/app/services/connection.service.ts

121 lines
3.3 KiB

import { ChangeSet, Text } from '@codemirror/state';
import { EditorView } from 'codemirror';
import {
Update,
collab,
getSyncedVersion,
receiveUpdates,
sendableUpdates,
} from '@codemirror/collab';
import { ViewPlugin, ViewUpdate } from '@codemirror/view';
export class Connection {
private requestId = 0;
private resolves: Record<number, (value: any) => void> = {};
constructor(private client: WebSocket) {
client.addEventListener('message', (event) => {
const response = JSON.parse(event.data);
if ('_request' in response) {
const resolve = this.resolves[response._request];
if (resolve) {
resolve(response.payload);
} else {
console.error(
'Received response for unknown or already used request',
response._request
);
}
} else {
console.error('Received invalid response', response._request);
}
});
}
request(body: Record<string, unknown>): Promise<any> {
body['_request'] = this.requestId;
this.client.send(JSON.stringify(body));
return new Promise(
(resolve) => (this.resolves[this.requestId++] = resolve)
);
}
}
function pushUpdates(
connection: Connection,
version: number,
fullUpdates: readonly Update[]
): Promise<boolean> {
// Strip off transaction data
let updates = fullUpdates.map((u) => ({
clientID: u.clientID,
changes: u.changes.toJSON(),
}));
return connection.request({ type: 'pushUpdates', version, updates });
}
function pullUpdates(
connection: Connection,
version: number
): Promise<readonly Update[]> {
return connection.request({ type: 'pullUpdates', version }).then((updates) =>
updates.map((u: any) => ({
changes: ChangeSet.fromJSON(u.changes),
clientID: u.clientID,
}))
);
}
export function getDocument(
connection: Connection
): Promise<{ version: number; doc: Text }> {
return connection.request({ type: 'getDocument' }).then((data) => ({
version: data.version,
doc: Text.of(data.doc.split('\n')),
}));
}
export function peerExtension(startVersion: number, connection: Connection) {
let plugin = ViewPlugin.fromClass(
class {
private pushing = false;
private done = false;
constructor(private view: EditorView) {
this.pull();
}
update(update: ViewUpdate) {
if (update.docChanged) this.push();
}
async push() {
let updates = sendableUpdates(this.view.state);
if (this.pushing || !updates.length) return;
this.pushing = true;
let version = getSyncedVersion(this.view.state);
await pushUpdates(connection, version, updates);
this.pushing = false;
// Regardless of whether the push failed or new updates came in
// while it was running, try again if there's updates remaining
if (sendableUpdates(this.view.state).length)
setTimeout(() => this.push(), 100);
}
async pull() {
while (!this.done) {
let version = getSyncedVersion(this.view.state);
let updates = await pullUpdates(connection, version);
this.view.dispatch(receiveUpdates(this.view.state, updates));
}
}
destroy() {
this.done = true;
}
}
);
return [collab({ startVersion }), plugin];
}