Skip to content

Commit f4371b8

Browse files
authored
Merge pull request #10 from not-empty/feature-check-complection
feature check completion
2 parents d3b2ccb + fbde759 commit f4371b8

File tree

4 files changed

+92
-0
lines changed

4 files changed

+92
-0
lines changed

.env.example

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
AUX_REDIS_HOST=taurus-redis
2+
AUX_REDIS_PORT=6379
3+
14
REDIS_HOST=taurus-redis
25
REDIS_MAX_RETRIES=null
36
REDIS_PORT=6379

README.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,43 @@ If you inside the container or want to run locally:
201201
```sh
202202
node sample/multi-producer.js default 60 '{"data":"mydata"}'
203203
```
204+
### Discover when all parallel async jobs are done
205+
206+
If you use parallel processing with multiple workers, finding out when all jobs have completed successfully can be a complicated task due to asynchrony.
207+
208+
To deal with this, Taurus has a functionality that uses a Redis + Lua solution to ensure that the last job in a group was executed, to use it you need to increment a list using a unique specific key when sending to each queue, and decrement this key to each queue that runs successfully.
209+
210+
The last queue will know it is last and will allow you to perform finishing actions.
211+
212+
You need:
213+
214+
1) Fill in the Redis data responsible for control in the .env file (we recommend not being the same person who manages the queues)
215+
216+
```
217+
AUX_REDIS_HOST=taurus-redis
218+
AUX_REDIS_PORT=6379
219+
```
220+
221+
2) When inserting into each queue you can use the CheckCompletion class to increment each job: (If you are outside the Taurus ecosystem you can just create a key in redis with [INCR command](https://redis.io/commands/incr/) with your favorite language.)
222+
223+
```js
224+
const CheckCompletion = require('../core/check-completion.js');
225+
...
226+
const checkCompletion = new CheckCompletion();
227+
await checkCompletion.increment(uniqueKeyToRepresenTheGroup);
228+
```
229+
230+
3) When each queue has finished executing, just call the decrement command and check if it returned zero, if it returned zero, this queue is the last one of this group and you can execute any command you find necessary
231+
232+
```js
233+
const CheckCompletion = require('../core/check-completion.js');
234+
...
235+
const checkCompletion = new CheckCompletion();
236+
const taksRemaining = await checkCompletion.decrement(sameUniqueKeyToRepresenTheGroup);
237+
if (taksRemaining === 0) {
238+
console.log('All jobs are finished');
239+
}
240+
```
204241
205242
### Development
206243

config/redis.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
module.exports = {
2+
auxRedisHost: process.env.AUX_REDIS_HOST || 'localhost',
3+
auxRedisPort: Number(process.env.AUX_REDIS_PORT) || 6379,
4+
};

core/check-completion.js

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
const Redis = require('ioredis');
2+
const configRedis = require('../config/redis');
3+
4+
class CheckCompletion {
5+
constructor() {
6+
this.options = {
7+
host: configRedis.auxRedisHost,
8+
port: configRedis.auxRedisPort,
9+
};
10+
}
11+
12+
async increment(
13+
key
14+
) {
15+
const redis = new Redis(
16+
this.options,
17+
);
18+
19+
const result = await redis.incr(key);
20+
console.log(`Job counter key: ${key}, increased, total jobs: ${result}`);
21+
22+
await redis.disconnect();
23+
return result;
24+
}
25+
26+
async decrement(
27+
key
28+
) {
29+
const redis = new Redis(
30+
this.options,
31+
);
32+
const luaScript = `
33+
local count = redis.call('DECR', KEYS[1])
34+
return count
35+
`;
36+
37+
const result = await redis.eval(luaScript, 1, key);
38+
if (result === 0) {
39+
console.log('Last job completed.');
40+
await redis.del(key);
41+
} else {
42+
console.log(`Jobs remaining: ${result}`);
43+
}
44+
return result;
45+
}
46+
}
47+
48+
module.exports = CheckCompletion;

0 commit comments

Comments
 (0)