blob: 92d6902c4ed976cb4a17a895726eb592138f9a36 [file] [log] [blame]
import { AsyncResource } from 'async_hooks';
import { Worker } from 'worker_threads';
import { cpus } from 'os';
import { EventEmitter } from 'events';
import serializeJavascript from 'serialize-javascript';
import { freeWorker, taskInfo, workerPoolWorkerFlag } from './constants';
import type {
WorkerCallback,
WorkerContext,
WorkerOutput,
WorkerPoolOptions,
WorkerPoolTask,
WorkerWithTaskInfo
} from './type';
class WorkerPoolTaskInfo extends AsyncResource {
constructor(private callback: WorkerCallback) {
super('WorkerPoolTaskInfo');
}
done(err: Error | null, result: any) {
this.runInAsyncScope(this.callback, null, err, result);
this.emitDestroy();
}
}
export class WorkerPool extends EventEmitter {
protected maxInstances: number;
protected filePath: string;
protected tasks: WorkerPoolTask[] = [];
protected workers: WorkerWithTaskInfo[] = [];
protected freeWorkers: WorkerWithTaskInfo[] = [];
constructor(options: WorkerPoolOptions) {
super();
this.maxInstances = options.maxWorkers || cpus().length;
this.filePath = options.filePath;
this.on(freeWorker, () => {
if (this.tasks.length > 0) {
const { context, cb } = this.tasks.shift()!;
this.runTask(context, cb);
}
});
}
get numWorkers(): number {
return this.workers.length;
}
addAsync(context: WorkerContext): Promise<WorkerOutput> {
return new Promise((resolve, reject) => {
this.runTask(context, (err, output) => {
if (err) {
reject(err);
return;
}
if (!output) {
reject(new Error('The output is empty'));
return;
}
resolve(output);
});
});
}
close() {
for (let i = 0; i < this.workers.length; i++) {
const worker = this.workers[i];
worker.terminate();
}
}
private addNewWorker() {
const worker: WorkerWithTaskInfo = new Worker(this.filePath, {
workerData: workerPoolWorkerFlag
});
worker.on('message', (result) => {
worker[taskInfo]?.done(null, result);
worker[taskInfo] = null;
this.freeWorkers.push(worker);
this.emit(freeWorker);
});
worker.on('error', (err) => {
if (worker[taskInfo]) {
worker[taskInfo].done(err, null);
} else {
this.emit('error', err);
}
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
});
this.workers.push(worker);
this.freeWorkers.push(worker);
this.emit(freeWorker);
}
private runTask(context: WorkerContext, cb: WorkerCallback) {
if (this.freeWorkers.length === 0) {
this.tasks.push({ context, cb });
if (this.numWorkers < this.maxInstances) {
this.addNewWorker();
}
return;
}
const worker = this.freeWorkers.pop();
if (worker) {
worker[taskInfo] = new WorkerPoolTaskInfo(cb);
worker.postMessage({
code: context.code,
options: serializeJavascript(context.options)
});
}
}
}