Skip to content
This repository was archived by the owner on May 6, 2023. It is now read-only.

Commit c9fa11f

Browse files
committed
Bump version to v0.2.0.
1 parent a479da4 commit c9fa11f

File tree

4 files changed

+97
-20
lines changed

4 files changed

+97
-20
lines changed

lib/pool.d.ts

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,19 @@ interface TaskPoolOptions {
66
* A positive number or zero to indicates maximum concurrency limit number, no limitation if it's
77
* set to 0 (similar with `Promise.all()`).
88
*/
9-
concurrency: number;
9+
concurrency?: number;
10+
/**
11+
* Is throws error and terminates tasks if some task threw error, default true.
12+
*/
13+
throwsError?: boolean;
1014
}
1115
export declare class TaskPool extends EventEmitter {
1216
private readonly DEFAULT_CONCURRENCY;
1317
private tasks;
1418
private concurrency;
19+
private throwsError;
1520
private resolutions;
21+
private errors;
1622
private isRunning;
1723
private queue;
1824
private running;
@@ -30,7 +36,12 @@ export declare class TaskPool extends EventEmitter {
3036
* Add a task or tasks array into pool.
3137
* @param task Task instance or task instances array.
3238
*/
33-
addTask(task: Task | Task[]): void;
39+
addTask(task: Task): number;
40+
/**
41+
* Add tasks array into pool.
42+
* @param tasks Task instances array.
43+
*/
44+
addTasks(tasks: Task | Task[]): number[];
3445
/**
3546
* Execute all tasks in the pool.
3647
* @async
@@ -41,9 +52,18 @@ export declare class TaskPool extends EventEmitter {
4152
* @param concurrency The maximum concurrency tasks number.
4253
*/
4354
setConcurrency(concurrency: number): void;
55+
/**
56+
* Gets execution errors of last time.
57+
*/
58+
getErrors(): (Error | undefined)[];
59+
/**
60+
* Get task instance by it's id.
61+
*/
62+
getTask(id: number): Task | null;
4463
private runTask;
4564
private nextTask;
4665
private run;
4766
private checkRunningState;
67+
private handleError;
4868
}
4969
export {};

lib/pool.js

Lines changed: 73 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ class TaskPool extends events_1.EventEmitter {
99
this.DEFAULT_CONCURRENCY = 30;
1010
this.tasks = [];
1111
this.resolutions = [];
12+
this.errors = [];
1213
this.isRunning = false;
1314
this.queue = [];
1415
this.running = new Set();
@@ -28,6 +29,12 @@ class TaskPool extends events_1.EventEmitter {
2829
if (typeof this.concurrency !== 'number' || this.concurrency < 0) {
2930
throw new TypeError('Invalid concurrency number');
3031
}
32+
if (typeof options?.throwsError === 'boolean') {
33+
this.throwsError = options.throwsError;
34+
}
35+
else {
36+
this.throwsError = true;
37+
}
3138
this.on('done', this.nextTask);
3239
}
3340
/**
@@ -36,19 +43,36 @@ class TaskPool extends events_1.EventEmitter {
3643
*/
3744
addTask(task) {
3845
if (task instanceof task_1.Task) {
39-
this.tasks.push(task);
46+
const length = this.tasks.push(task);
47+
return length - 1;
4048
}
41-
else if (task instanceof Array) {
42-
task.forEach((t) => {
43-
if (!(t instanceof task_1.Task)) {
44-
throw new TypeError('Invalid task');
45-
}
46-
this.tasks.push(t);
47-
});
49+
// throw error if params is not a task or a tasks array.
50+
throw new TypeError('Invalid task(s)');
51+
}
52+
/**
53+
* Add tasks array into pool.
54+
* @param tasks Task instances array.
55+
*/
56+
addTasks(tasks) {
57+
let taskArr;
58+
if (tasks instanceof Array) {
59+
taskArr = tasks;
60+
}
61+
else if (tasks instanceof task_1.Task) {
62+
taskArr = [tasks];
4863
}
4964
else {
50-
throw new TypeError('Invalid task');
65+
throw new TypeError('Invalid task(s)');
5166
}
67+
const ids = [];
68+
taskArr.forEach((t) => {
69+
if (!(t instanceof task_1.Task)) {
70+
throw new TypeError('Invalid task');
71+
}
72+
const length = this.tasks.push(t);
73+
ids.push(length - 1);
74+
});
75+
return ids;
5276
}
5377
/**
5478
* Execute all tasks in the pool.
@@ -71,19 +95,37 @@ class TaskPool extends events_1.EventEmitter {
7195
}
7296
this.concurrency = concurrency;
7397
}
98+
/**
99+
* Gets execution errors of last time.
100+
*/
101+
getErrors() {
102+
return [...this.errors];
103+
}
104+
/**
105+
* Get task instance by it's id.
106+
*/
107+
getTask(id) {
108+
return id in this.tasks ? this.tasks[id] : null;
109+
}
74110
runTask(index) {
75111
if (index >= this.tasks.length) {
76112
throw new Error('Invalid task');
77113
}
78114
const task = this.tasks[index];
79115
this.running.add(index);
80-
Promise.resolve(task.exec()).then((res) => {
81-
this.running.delete(index);
82-
this.resolutions[index] = res;
83-
this.emit('done');
84-
}, (err) => {
85-
this.emit('error', err);
86-
});
116+
try {
117+
Promise.resolve(task.exec()).then((res) => {
118+
this.running.delete(index);
119+
this.resolutions[index] = res;
120+
this.emit('done');
121+
}, (err) => {
122+
this.running.delete(index);
123+
this.handleError(err, index);
124+
});
125+
}
126+
catch (err) {
127+
this.handleError(err, index);
128+
}
87129
}
88130
nextTask() {
89131
const next = this.queue.shift();
@@ -100,9 +142,13 @@ class TaskPool extends events_1.EventEmitter {
100142
this.running.clear();
101143
this.queue = [];
102144
this.isRunning = true;
145+
// clear last resolutions and errors
146+
this.resolutions = [];
147+
this.errors = [];
103148
// eslint-disable-next-line no-unused-vars
104149
return new Promise((resolve, reject) => {
105150
this.on('error', (err) => {
151+
this.isRunning = false;
106152
reject(err);
107153
});
108154
this.on('completed', () => {
@@ -124,5 +170,16 @@ class TaskPool extends events_1.EventEmitter {
124170
throw new Error('Task pool is executing');
125171
}
126172
}
173+
handleError(err, index) {
174+
if (this.throwsError) {
175+
this.queue = [];
176+
this.emit('error', err);
177+
}
178+
else {
179+
this.running.delete(index);
180+
this.errors[index] = err;
181+
this.emit('done');
182+
}
183+
}
127184
}
128185
exports.TaskPool = TaskPool;

package-lock.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@antmind/task-pool",
3-
"version": "0.1.2",
3+
"version": "0.2.0",
44
"description": "A simple Node.js functional tasks pool implementation, supported both synchronous and asynchronous functions.",
55
"main": "./lib/index.js",
66
"scripts": {

0 commit comments

Comments
 (0)