'use strict'; function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, arguments); return new Promise(function (resolve, reject) { function step(key, arg) { try { var info = gen[key](arg); var value = info.value; } catch (error) { reject(error); return; } if (info.done) { resolve(value); } else { return Promise.resolve(value).then(function (value) { step("next", value); }, function (err) { step("throw", err); }); } } return step("next"); }); }; } var _require = require('events'); const EventEmitter = _require.EventEmitter; const os = require('os'); const errorUtils = require('./errorUtils'); const Worker = require('./Worker'); let shared = null; class WorkerFarm extends EventEmitter { constructor(options, farmOptions = {}) { super(); this.options = Object.assign({ maxConcurrentWorkers: WorkerFarm.getNumWorkers(), maxConcurrentCallsPerWorker: 10, forcedKillTime: 100, warmWorkers: true, useLocalWorker: true, workerPath: '../worker' }, farmOptions); this.started = false; this.warmWorkers = 0; this.children = new Map(); this.callQueue = []; this.localWorker = require(this.options.workerPath); this.run = this.mkhandle('run'); this.init(options); } warmupWorker(method, args) { // Workers have started, but are not warmed up yet. // Send the job to a remote worker in the background, // but use the result from the local worker - it will be faster. if (this.started) { let promise = this.addCall(method, [...args, true]); if (promise) { promise.then(() => { this.warmWorkers++; if (this.warmWorkers >= this.children.size) { this.emit('warmedup'); } }).catch(() => {}); } } } mkhandle(method) { return function (...args) { // Child process workers are slow to start (~600ms). // While we're waiting, just run on the main thread. // This significantly speeds up startup time. if (this.shouldUseRemoteWorkers()) { return this.addCall(method, [...args, false]); } else { if (this.options.warmWorkers) { this.warmupWorker(method, args); } return this.localWorker[method](...args, false); } }.bind(this); } onError(error, childId) { // Handle ipc errors if (error.code === 'ERR_IPC_CHANNEL_CLOSED') { return this.stopChild(childId); } } onExit(childId) { // delay this to give any sends a chance to finish setTimeout(() => { let doQueue = false; let child = this.children.get(childId); if (child && child.calls.size) { var _iteratorNormalCompletion = true; var _didIteratorError = false; var _iteratorError = undefined; try { for (var _iterator = child.calls.values()[Symbol.iterator](), _step; !(_iteratorNormalCompletion = (_step = _iterator.next()).done); _iteratorNormalCompletion = true) { let call = _step.value; call.retries++; this.callQueue.unshift(call); doQueue = true; } } catch (err) { _didIteratorError = true; _iteratorError = err; } finally { try { if (!_iteratorNormalCompletion && _iterator.return) { _iterator.return(); } } finally { if (_didIteratorError) { throw _iteratorError; } } } } this.stopChild(childId); if (doQueue) { this.processQueue(); } }, 10); } startChild() { let worker = new Worker(this.options.workerPath, this.options); worker.on('request', data => { this.processRequest(data, worker); }); worker.on('response', () => { // allow any outstanding calls to be processed this.processQueue(); }); worker.once('exit', () => { this.onExit(worker.id); }); worker.on('error', err => { this.onError(err, worker.id); }); this.children.set(worker.id, worker); } stopChild(childId) { let child = this.children.get(childId); if (child) { child.stop(); this.children.delete(childId); } } processQueue() { var _this = this; return _asyncToGenerator(function* () { if (_this.ending || !_this.callQueue.length) return; if (_this.children.size < _this.options.maxConcurrentWorkers) { _this.startChild(); } var _iteratorNormalCompletion2 = true; var _didIteratorError2 = false; var _iteratorError2 = undefined; try { for (var _iterator2 = _this.children.values()[Symbol.iterator](), _step2; !(_iteratorNormalCompletion2 = (_step2 = _iterator2.next()).done); _iteratorNormalCompletion2 = true) { let child = _step2.value; if (!_this.callQueue.length) { break; } if (child.calls.size < _this.options.maxConcurrentCallsPerWorker) { child.call(_this.callQueue.shift()); } } } catch (err) { _didIteratorError2 = true; _iteratorError2 = err; } finally { try { if (!_iteratorNormalCompletion2 && _iterator2.return) { _iterator2.return(); } } finally { if (_didIteratorError2) { throw _iteratorError2; } } } })(); } processRequest(data, child = false) { return _asyncToGenerator(function* () { let result = { idx: data.idx, type: 'response' }; let method = data.method; let args = data.args; let location = data.location; let awaitResponse = data.awaitResponse; if (!location) { throw new Error('Unknown request'); } const mod = require(location); try { let func; if (method) { func = mod[method]; } else { func = mod; } result.contentType = 'data'; result.content = yield func(...args); } catch (e) { result.contentType = 'error'; result.content = errorUtils.errorToJson(e); } if (awaitResponse) { if (child) { child.send(result); } else { return result; } } })(); } addCall(method, args) { if (this.ending) return; // don't add anything new to the queue return new Promise((resolve, reject) => { this.callQueue.push({ method, args: args, retries: 0, resolve, reject }); this.processQueue(); }); } end() { var _this2 = this; return _asyncToGenerator(function* () { _this2.ending = true; var _iteratorNormalCompletion3 = true; var _didIteratorError3 = false; var _iteratorError3 = undefined; try { for (var _iterator3 = _this2.children.keys()[Symbol.iterator](), _step3; !(_iteratorNormalCompletion3 = (_step3 = _iterator3.next()).done); _iteratorNormalCompletion3 = true) { let childId = _step3.value; _this2.stopChild(childId); } } catch (err) { _didIteratorError3 = true; _iteratorError3 = err; } finally { try { if (!_iteratorNormalCompletion3 && _iterator3.return) { _iterator3.return(); } } finally { if (_didIteratorError3) { throw _iteratorError3; } } } _this2.ending = false; shared = null; })(); } init(options) { this.localWorker.init(options, true); this.initRemoteWorkers(options); } initRemoteWorkers(options) { var _this3 = this; return _asyncToGenerator(function* () { _this3.started = false; _this3.warmWorkers = 0; // Start workers if there isn't enough workers already for (let i = _this3.children.size; i < _this3.options.maxConcurrentWorkers; i++) { _this3.startChild(); } // Reliable way of initialising workers let promises = []; var _iteratorNormalCompletion4 = true; var _didIteratorError4 = false; var _iteratorError4 = undefined; try { for (var _iterator4 = _this3.children.values()[Symbol.iterator](), _step4; !(_iteratorNormalCompletion4 = (_step4 = _iterator4.next()).done); _iteratorNormalCompletion4 = true) { let child = _step4.value; promises.push(new Promise(function (resolve, reject) { child.call({ method: 'init', args: [options], retries: 0, resolve, reject }); })); } } catch (err) { _didIteratorError4 = true; _iteratorError4 = err; } finally { try { if (!_iteratorNormalCompletion4 && _iterator4.return) { _iterator4.return(); } } finally { if (_didIteratorError4) { throw _iteratorError4; } } } yield Promise.all(promises); if (_this3.options.maxConcurrentWorkers > 0) { _this3.started = true; _this3.emit('started'); } })(); } shouldUseRemoteWorkers() { return !this.options.useLocalWorker || this.started && (this.warmWorkers >= this.children.size || !this.options.warmWorkers); } static getShared(options) { if (!shared) { shared = new WorkerFarm(options); } else if (options) { shared.init(options); } return shared; } static getNumWorkers() { if (process.env.PARCEL_WORKERS) { return parseInt(process.env.PARCEL_WORKERS, 10); } let cores; try { cores = require('physical-cpu-count'); } catch (err) { cores = os.cpus().length; } return cores || 1; } } module.exports = WorkerFarm;