Skip to content

Commit 95c8cde

Browse files
author
Olha Virolainen
authored
Add retry faceless (#171)
* use retry for faceless * add await to emitter according #149
1 parent b2bb985 commit 95c8cde

File tree

9 files changed

+165
-137
lines changed

9 files changed

+165
-137
lines changed

lib/actions/deleteObject.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,11 @@ module.exports.process = async function process(message, configuration) {
9090
} else {
9191
if (results.length === 0) {
9292
this.logger.info('No objects are found');
93-
return this.emit('data', messages.newEmptyMessage());
93+
return messages.newEmptyMessage();
9494
}
9595
const err = new Error('More than one object found, can only delete 1');
9696
this.logger.error(err);
97-
return this.emit('error', err);
97+
throw err;
9898
}
9999
}
100100
this.logger.debug(`Preparing to delete a ${configuration.sobject} object...`);

lib/actions/lookupObject.js

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,12 @@ module.exports.process = async function processAction(message, configuration) {
6767

6868
if (!lookupValue) {
6969
if (allowCriteriaToBeOmitted) {
70-
this.emit('data', messages.newMessageWithBody({}));
70+
await this.emit('data', messages.newMessageWithBody({}));
7171
return;
7272
}
7373
const err = new Error('No unique criteria provided');
7474
this.logger.error(err);
75-
this.emit('error', err);
76-
return;
75+
throw err;
7776
}
7877

7978
const meta = await callJSForceMethod.call(this, configuration, 'describe');
@@ -88,8 +87,8 @@ module.exports.process = async function processAction(message, configuration) {
8887
if (lookupCache.hasKey(queryKey)) {
8988
this.logger.info('Cached response found!');
9089
const response = lookupCache.getResponse(queryKey);
91-
// eslint-disable-next-line consistent-return
92-
return this.emit('data', messages.newMessageWithBody(response));
90+
await this.emit('data', messages.newMessageWithBody(response));
91+
return;
9392
}
9493

9594
// the query for the object and all its linked parent objects
@@ -107,11 +106,11 @@ module.exports.process = async function processAction(message, configuration) {
107106
if (records.length === 0) {
108107
if (allowZeroResults) {
109108
lookupCache.addRequestResponsePair(queryKey, {});
110-
this.emit('data', messages.newMessageWithBody({}));
109+
await this.emit('data', messages.newMessageWithBody({}));
111110
} else {
112111
const err = new Error('No objects found');
113112
this.logger.error(err);
114-
this.emit('error', err);
113+
throw (err);
115114
}
116115
} else if (records.length === 1) {
117116
try {
@@ -126,13 +125,14 @@ module.exports.process = async function processAction(message, configuration) {
126125

127126
lookupCache.addRequestResponsePair(queryKey, records[0]);
128127
this.logger.debug('Emitting record');
129-
this.emit('data', outputMessage);
128+
await this.emit('data', outputMessage);
130129
} catch (err) {
131-
this.emit('error', err);
130+
this.logger.error('Lookup Object error occurred');
131+
throw (err);
132132
}
133133
} else {
134134
const err = new Error('More than one object found');
135135
this.logger.error(err);
136-
this.emit('error', err);
136+
throw (err);
137137
}
138138
};

lib/helpers/attachment.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ const requestPromise = require('request-promise');
66
const client = require('elasticio-rest-node')();
77

88
const { callJSForceMethod } = require('./wrapper');
9-
const { getCredentials } = require('./oauth2Helper');
9+
const { getSecret } = require('../util');
1010

1111
async function downloadFile(url, headers) {
1212
const optsDownload = {
@@ -71,7 +71,7 @@ exports.getAttachment = async function getAttachment(configuration, objectConten
7171
const binDataUrl = objectContent[binField.name];
7272
if (!binDataUrl) return;
7373

74-
const credentials = await getCredentials(emitter, configuration.secretId);
74+
const { credentials } = await getSecret(emitter, configuration.secretId);
7575
const data = await downloadFile(credentials.undefined_params.instance_url + binDataUrl, {
7676
Authorization: `Bearer ${credentials.accessToken}`,
7777
});

lib/helpers/oauth2Helper.js

Lines changed: 0 additions & 56 deletions
This file was deleted.

lib/helpers/wrapper.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/* eslint-disable no-await-in-loop */
22
const { SalesForceClient } = require('../salesForceClient');
3-
const { getCredentials, refreshToken } = require('./oauth2Helper');
3+
const { getSecret, refreshToken } = require('../util');
44
const { REFRESH_TOKEN_RETRIES } = require('../common.js').globalConsts;
55

66
let client;
@@ -12,7 +12,7 @@ exports.callJSForceMethod = async function callJSForceMethod(configuration, meth
1212
const { secretId } = configuration;
1313
if (secretId) {
1414
this.logger.debug('Fetching credentials by secretId');
15-
const credentials = await getCredentials(this, secretId);
15+
const { credentials } = await getSecret(this, secretId);
1616
accessToken = credentials.access_token;
1717
instanceUrl = credentials.undefined_params.instance_url;
1818
} else {

lib/salesForceClient.js

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,8 @@ class SalesForceClient {
168168
.on('end', () => {
169169
this.logger.debug('Found %s records', results.length);
170170
})
171-
.on('error', (err) => {
172-
this.emit('error', err);
171+
.on('error', async (err) => {
172+
await this.emit('error', err);
173173
})
174174
.execute({ autoFetch: true, maxFetch });
175175
return results;
@@ -222,23 +222,29 @@ class SalesForceClient {
222222
const sobject = options.sobject || this.configuration.sobject;
223223
const includeDeleted = options.includeDeleted || this.configuration.includeDeleted;
224224
const { wherePart, offset, limit } = options;
225-
await this.connection.sobject(sobject)
226-
.select('*')
227-
.where(wherePart)
228-
.offset(offset)
229-
.limit(limit)
230-
.scanAll(includeDeleted)
231-
.on('error', (err) => {
232-
this.logger.error('Salesforce returned an error');
233-
throw err;
234-
})
235-
.on('record', (record) => {
236-
records.push(record);
237-
})
238-
.on('end', () => {
239-
this.logger.debug('Found %s records', records.length);
240-
})
241-
.execute({ autoFetch: true, maxFetch: limit });
225+
try {
226+
await this.connection.sobject(sobject)
227+
.select('*')
228+
.where(wherePart)
229+
.offset(offset)
230+
.limit(limit)
231+
.scanAll(includeDeleted)
232+
.on('error', (err) => {
233+
this.logger.error('Salesforce returned an error');
234+
throw err;
235+
})
236+
.on('record', (record) => {
237+
records.push(record);
238+
})
239+
.on('end', () => {
240+
this.logger.debug('Found %s records', records.length);
241+
})
242+
.execute({ autoFetch: true, maxFetch: limit });
243+
} catch (e) {
244+
this.logger.trace('Lookup query failed', e);
245+
this.logger.error('Lookup query failed');
246+
throw e;
247+
}
242248
return records;
243249
}
244250

lib/triggers/streamPlatformEvents.js

Lines changed: 39 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
const jsforce = require('jsforce');
22
const { messages } = require('elasticio-node');
33
const { callJSForceMethod } = require('../helpers/wrapper');
4-
const { getCredentials, refreshToken } = require('../helpers/oauth2Helper');
5-
const { REFRESH_TOKEN_RETRIES, SALESFORCE_API_VERSION } = require('../common.js').globalConsts;
4+
const { getSecret, refreshToken } = require('../util');
5+
const { SALESFORCE_API_VERSION } = require('../common.js').globalConsts;
66

7+
let fayeClient;
78
/**
89
* This method will be called from elastic.io platform providing following data
910
*
@@ -12,15 +13,13 @@ const { REFRESH_TOKEN_RETRIES, SALESFORCE_API_VERSION } = require('../common.js'
1213
*/
1314
async function processTrigger(msg, configuration) {
1415
this.logger.info('Starting Subscribe to platform events Trigger');
15-
let iteration = REFRESH_TOKEN_RETRIES;
16-
let refreshTokenSuccess = false;
1716
const { secretId } = configuration;
1817
if (!secretId) {
1918
this.logger.error('secretId is missing in configuration, credentials cannot be fetched');
2019
throw new Error('secretId is missing in configuration, credentials cannot be fetched');
2120
}
2221
this.logger.debug('Fetching credentials by secretId');
23-
const credentials = await getCredentials(this, secretId);
22+
const { credentials } = await getSecret(this, secretId);
2423
const accessToken = credentials.access_token;
2524
const instanceUrl = credentials.undefined_params.instance_url;
2625
this.logger.trace('AccessToken = %s', accessToken);
@@ -33,52 +32,46 @@ async function processTrigger(msg, configuration) {
3332
const topic = `/event/${configuration.object}`;
3433
const replayId = -1;
3534
this.logger.debug('Creating streaming client');
36-
const fayeClient = connection.streaming.createClient([
37-
new jsforce.StreamingExtension.Replay(topic, replayId),
38-
new jsforce.StreamingExtension.AuthFailure(async (err) => {
39-
this.logger.trace('AuthFailure: %j', err);
40-
// eslint-disable-next-line max-len
41-
// if (err.ext && err.ext.sfdc && err.ext.sfdc.failureReason && (err.ext.sfdc.failureReason === '401::Authentication invalid') && (iteration > 0)) {
42-
this.logger.trace('err.ext.sfdc.failureReason: %s', err.ext.sfdc.failureReason);
43-
if (err.ext.sfdc.failureReason === '401::Authentication invalid') {
44-
do {
45-
iteration -= 1;
35+
if (!fayeClient) {
36+
fayeClient = connection.streaming.createClient([
37+
new jsforce.StreamingExtension.Replay(topic, replayId),
38+
new jsforce.StreamingExtension.AuthFailure(async (err) => {
39+
this.logger.trace('AuthFailure: %j', err);
40+
if (err.ext && err.ext.sfdc && err.ext.sfdc.failureReason && (err.ext.sfdc.failureReason === '401::Authentication invalid')) {
4641
try {
47-
this.logger.debug('Session is expired, trying to refresh token, iteration: %s', REFRESH_TOKEN_RETRIES - iteration);
48-
refreshTokenSuccess = await refreshToken(this, secretId);
49-
this.logger.debug('Token is successfully refreshed, iteration: %s', REFRESH_TOKEN_RETRIES - iteration);
50-
break;
42+
this.logger.debug('Session is expired, trying to refresh token');
43+
await refreshToken(this, secretId);
44+
this.logger.debug('Token is successfully refreshed');
5145
} catch (error) {
52-
this.logger.error('Failed to refresh token, iteration: %s', REFRESH_TOKEN_RETRIES - iteration);
46+
this.logger.trace('Refresh token error: %j', error);
47+
this.logger.error('Failed to fetch and/or refresh token');
48+
throw new Error('Failed to fetch and/or refresh token');
5349
}
54-
} while (iteration > 0);
55-
if (!refreshTokenSuccess) {
56-
this.logger.error('Failed to fetch and/or refresh token, retries exceeded');
57-
throw new Error('Failed to fetch and/or refresh token, retries exceeded');
50+
fayeClient = undefined;
51+
this.logger.info('Lets call processTrigger one more time');
52+
await processTrigger.call(this, msg, configuration);
53+
} else {
54+
this.logger.error('AuthFailure extension error occurred');
55+
throw err;
5856
}
59-
this.logger.info('Lets call processTrigger one more time');
60-
await processTrigger.call(this, msg, configuration);
61-
} else {
62-
this.logger.error('AuthFailure extension error occurred');
63-
throw err;
64-
}
65-
}),
66-
]);
57+
}),
58+
]);
6759

68-
fayeClient.subscribe(topic, async (message) => {
69-
this.logger.info('Incoming message found, going to emit...');
70-
this.logger.trace('Incoming Message: %j', message);
71-
await this.emit('data', messages.newMessageWithBody(message));
72-
})
73-
.then(() => {
74-
this.logger.info('Subscribed to PushTopic successfully');
75-
this.logger.trace(`Subscribed to PushTopic: ${topic}`);
76-
},
77-
(err) => {
78-
this.logger.error('Subscriber error occurred');
79-
throw err;
80-
});
81-
this.logger.info('Streaming client created and ready');
60+
fayeClient.subscribe(topic, async (message) => {
61+
this.logger.info('Incoming message found, going to emit...');
62+
this.logger.trace('Incoming Message: %j', message);
63+
await this.emit('data', messages.newMessageWithBody(message));
64+
})
65+
.then(() => {
66+
this.logger.info('Subscribed to PushTopic successfully');
67+
this.logger.trace(`Subscribed to PushTopic: ${topic}`);
68+
},
69+
(err) => {
70+
this.logger.error('Subscriber error occurred');
71+
throw err;
72+
});
73+
this.logger.info('Streaming client created and ready');
74+
}
8275
}
8376

8477
/**

0 commit comments

Comments
 (0)