// This can be used to transfer an RTCDataChannel to a worker, and expose an // interface that acts as a passthrough to the channel on the worker. There are // some caveats though: // * certain kinds of error won't propagate back to the callsite, and will // manifest as an unhandled error (eg; webidl errors on attribute setters) // * the event handler/GC interactions won't translate exactly (because the // worker code registers all handler types up front, instead of waiting for // registrations to happen on the wrapper) // * RTCDataChannel.label must be unique on the worker class WorkerBackedDataChannel extends EventTarget { #worker; #dcAttrs; #eventHandlers; #errorPromise; #label; // If you want to make multiple of these with the same worker, create first // with no args, and the others with first.worker constructor(worker = null) { super(); this.#worker = worker || WorkerBackedDataChannel.makeWorker(); // Cache of the RTTCDataChannel's state attributes, filled on init. Some are // updated by state updates later. this.#dcAttrs = null; // For tracking the onxxxx-style event callbacks // TODO: Maybe there's a simpler way to do this? this.#eventHandlers = new Map(); this.#listenForEventMessages(); // Ejection seat that we put in our promises, for cases where we've misused // the worker (or its code), or encountered some sort of unhandled error this.#errorPromise = new Promise((_, reject) => { // the Worker 'error' and 'messageerror' events const onErrorEvent = (e) => { switch (e.type) { case 'error': case 'messageerror': reject(new Error(`Worker sent ${e.type} event: ${e.message}`)); break; } }; this.#worker.addEventListener('error', onErrorEvent); this.#worker.addEventListener('messageerror', onErrorEvent); // Unhandled exceptions thrown by *our* worker code; not Worker error // events (those are handled above), and not errors thrown by // RTCDataChannel (those are handled in #sendRequestToWorker) this.#worker.addEventListener('message', ({data}) => { const {type, label, result} = data; if (type == 'workerError' && (label === undefined || label == this.#label)) { reject(new Error( `Worker code sent error message: ${result}`)); } }); }); } async init(channel) { this.#label = channel.label; // DO NOT GO ASYNC BEFORE THIS! Doing so will render the channel // untransferable. const initPromise = this.#sendRequestToWorker('init', channel, [channel]); this.#dcAttrs = await Promise.race([initPromise, this.#errorPromise]); return this.#dcAttrs; } static makeWorker() { return new Worker('/webrtc/RTCDataChannel-worker.js'); } // Make it easy to put more channels on this worker get worker() { return this.#worker; } // Read-only attributes get label() { return this.#dcAttrs.label; } get ordered() { return this.#dcAttrs.ordered; } get maxPacketLifeTime() { return this.#dcAttrs.maxPacketLifeTime; } get maxRetransmits() { return this.#dcAttrs.maxRetransmits; } get protocol() { return this.#dcAttrs.protocol; } get negotiated() { return this.#dcAttrs.negotiated; } get id() { return this.#dcAttrs.id; } get readyState() { return this.#dcAttrs.readyState; } get bufferedAmount() { return this.#dcAttrs.bufferedAmount; } // Writable attributes set bufferedAmountLowThreshold(val) { this.#dcAttrs.bufferedAmountLowThreshold = val; this.#sendRequestToWorker('setBufferedAmountLowThreshold', val); } get bufferedAmountLowThreshold() { return this.#dcAttrs.bufferedAmountLowThreshold; } set binaryType(val) { this.#dcAttrs.binaryType = val; this.#sendRequestToWorker('setBinaryType', val); } get binaryType() { return this.#dcAttrs.binaryType; } // Note: these do not try to match the way the handler is registered on the // other end (eg; dc.onopen = handler is performed on the worker as an // addEventListener call, not as workerDc.onopen = func). This means that // this wrapper is not suitable for testing GC logic based on event handlers. set onopen(fn) { this.#setEventHandler('open', fn); } set onbufferedamountlow(fn) { this.#setEventHandler('bufferedamountlow', fn); } set onerror(fn) { this.#setEventHandler('error', fn); } set onclosing(fn) { this.#setEventHandler('closing', fn); } set onclose(fn) { this.#setEventHandler('close', fn); } set onmessage(fn) { this.#setEventHandler('message', fn); } async send(data) { return this.#sendRequestToWorker('send', data); } async close() { return this.#sendRequestToWorker('close'); } // Used to refresh readyState, bufferedAmount, and id async updateState() { const resp = await Promise.race([this.#sendRequestToWorker('queryState'), this.#errorPromise]); this.#dcAttrs.readyState = resp.readyState; this.#dcAttrs.bufferedAmount = resp.bufferedAmount; this.#dcAttrs.id = resp.id; return resp; } #setEventHandler(type, handler) { // Listener might not exist, removeEventListener doesn't care this.removeEventListener(type, this.#eventHandlers.get(type)); this.#eventHandlers.delete(type); if (handler) { this.addEventListener(type, handler); this.#eventHandlers.set(type, handler); } } #listenForEventMessages() { this.#worker.addEventListener('message', ({data}) => { const { type, label, result } = data; const eventTypes = ['open', 'bufferedamountlow', 'error', 'closing', 'close', 'message']; if (label == this.#label && eventTypes.includes(type)) { let e; if (type == 'message') { const {data, origin} = result; e = new MessageEvent(type, {data, origin}); } else { e = new Event(type); } this.dispatchEvent(e); } }); } #sendRequestToWorker(type, arg, transferOrOptions) { if (!this.#label) { throw new Error('RTCDataChannel worker shim not initialized!'); } return new Promise((resolve, reject) => { // We currently assume that if multiple requests of the same type are // sent, they get responses in the same order. That probably won't // change, but if it does we'll need a transaction id. const msg = { type, label: this.#label, arg }; const responseType = `${type}Response` const onResponse = ({data}) => { const {type, label, result} = data; if (type == responseType && label == this.#label) { this.#worker.removeEventListener('message', onResponse); if (result?.error) { // Error thrown by RTCDataChannel, other error cases are handled by // the code in this.#errorPromise // TODO: Maybe re-synthesize the specific error thrown by the // RTCDataChannel? reject(new Error(`RTCDataChannel error: ${result.error.message}`)); } else { resolve(result); } } }; this.#worker.addEventListener('message', onResponse); this.#worker.postMessage(msg, transferOrOptions); }); } }