Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/actions/bulk_cud.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const { messages } = require('elasticio-node');
const { Readable } = require('stream');
const util = require('../util');

const { Readable } = require('stream');

const MetaLoader = require('../helpers/metaLoader');
const sfConnection = require('../helpers/sfConnection.js');
Expand Down Expand Up @@ -49,7 +49,7 @@ exports.process = async function bulkCUD(message, configuration) {
timeout = DEFAULT_TIMEOUT;
}

let result = await util.downloadAttachment(message.attachments[key].url);
const result = await util.downloadAttachment(message.attachments[key].url);

const csvStream = new Readable();
csvStream.push(result);
Expand Down
3 changes: 2 additions & 1 deletion lib/actions/createObject.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ exports.process = async function createObject(message, configuration) {

return messages.newMessageWithBody(message.body);
} catch (err) {
return this.emit('error', err);
await this.emit('error', err);
}
return undefined;
};

exports.getMetaModel = function getMetaModel(cfg, cb) {
Expand Down
19 changes: 11 additions & 8 deletions lib/actions/lookup.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,26 +57,29 @@ module.exports.process = async function processAction(message, configuration) {
.on('record', (record) => {
res.push(record);
})
.on('end', () => {
.on('end', async () => {
if (!res.length) {
this.emit('data', messages.newMessageWithBody({}));
await this.emit('data', messages.newMessageWithBody({}));
}
if (batchSize > 0) {
while (res.length) {
const result = res.splice(0, batchSize);
this.logger.debug('emitting batch %j', { result });
this.emit('data', messages.newMessageWithBody({ result }));
// eslint-disable-next-line no-await-in-loop
await this.emit('data', messages.newMessageWithBody({ result }));
}
} else {
res.forEach((record) => {
// eslint-disable-next-line no-restricted-syntax
for (const record of res) {
this.logger.debug('emitting record %j', record);
this.emit('data', messages.newMessageWithBody(record));
});
// eslint-disable-next-line no-await-in-loop
await this.emit('data', messages.newMessageWithBody(record));
}
}
})
.on('error', (err) => {
.on('error', async (err) => {
this.logger.error(err);
this.emit('error', err);
await this.emit('error', err);
})
.execute({ autoFetch: true, maxFetch });
};
Expand Down
21 changes: 11 additions & 10 deletions lib/actions/lookupObject.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ module.exports.process = async function processAction(message, configuration) {

if (!lookupValue) {
if (allowCriteriaToBeOmitted) {
this.emit('data', messages.newMessageWithBody({}));
await this.emit('data', messages.newMessageWithBody({}));
return;
}
const err = new Error('No unique criteria provided');
this.logger.error(err);
this.emit('error', err);
await this.emit('error', err);
return;
}

Expand All @@ -80,7 +80,8 @@ module.exports.process = async function processAction(message, configuration) {
this.logger.info('Cached response found!');
const response = lookupCache.getResponse(queryKey);
// eslint-disable-next-line consistent-return
return this.emit('data', messages.newMessageWithBody(response));
await this.emit('data', messages.newMessageWithBody(response));
return;
}

// the query for the object and all its linked parent objects
Expand All @@ -105,15 +106,15 @@ module.exports.process = async function processAction(message, configuration) {
}, query);

query = query.where(condition)
.on('error', (err) => {
.on('error', async (err) => {
this.logger.error(err);
if (err.message === 'Binary fields cannot be selected in join queries') {
// eslint-disable-next-line no-param-reassign
err.message = 'Binary fields cannot be selected in join queries. '
+ 'Instead of querying objects with binary fields as linked objects '
+ '(such as children Attachments), try querying them directly.';
}
this.emit('error', err);
await this.emit('error', err);
});

query.on('record', (record) => {
Expand All @@ -124,11 +125,11 @@ module.exports.process = async function processAction(message, configuration) {
if (res.length === 0) {
if (allowZeroResults) {
lookupCache.addRequestResponsePair(queryKey, {});
this.emit('data', messages.newMessageWithBody({}));
await this.emit('data', messages.newMessageWithBody({}));
} else {
const err = new Error('No objects found');
this.logger.error(err);
this.emit('error', err);
await this.emit('error', err);
}
} else if (res.length === 1) {
try {
Expand All @@ -143,15 +144,15 @@ module.exports.process = async function processAction(message, configuration) {

lookupCache.addRequestResponsePair(queryKey, res[0]);
this.logger.debug('emitting record %j', outputMessage);
this.emit('data', outputMessage);
await this.emit('data', outputMessage);
} catch (err) {
this.logger.error(err);
this.emit('error', err);
await this.emit('error', err);
}
} else {
const err = new Error('More than one object found');
this.logger.error(err);
this.emit('error', err);
await this.emit('error', err);
}
});

Expand Down
26 changes: 14 additions & 12 deletions lib/actions/lookupObjects.js
Original file line number Diff line number Diff line change
Expand Up @@ -229,17 +229,19 @@ module.exports.process = async function processAction(message, configuration) {
const responseArray = lookupCache.getResponse(queryKey);
if (configuration.outputMethod === 'emitIndividually') {
if (responseArray.length === 0) {
return this.emit('data', messages.newMessageWithBody({ results: [] }));
await this.emit('data', messages.newMessageWithBody({ results: [] }));
return;
}

for (let i = 0; i < responseArray.length; i += 1) {
// eslint-disable-next-line no-await-in-loop
await this.emit('data', messages.newMessageWithBody({ results: [responseArray[i]] }));
}
return true;
return;
}

return this.emit('data', messages.newMessageWithBody({ results: responseArray }));
await this.emit('data', messages.newMessageWithBody({ results: responseArray }));
return;
}

const records = [];
Expand All @@ -250,20 +252,20 @@ module.exports.process = async function processAction(message, configuration) {
.offset(offset)
.limit(limit)
.scanAll(configuration.includeDeleted)
.on('error', (err) => {
.on('error', async (err) => {
const errExt = _.cloneDeep(err);
errExt.message = `Salesforce returned an error: ${err.message}`;
this.emit('error', errExt);
await this.emit('error', errExt);
});

if (configuration.outputMethod === 'emitIndividually') {
query.on('record', (record) => {
query.on('record', async (record) => {
records.push(record);
this.emit('data', messages.newMessageWithBody({ results: [record] }));
await this.emit('data', messages.newMessageWithBody({ results: [record] }));
})
.on('end', () => {
.on('end', async () => {
if (!query.totalFetched) {
this.emit('data', messages.newMessageWithBody({ results: [] }));
await this.emit('data', messages.newMessageWithBody({ results: [] }));
}
lookupCache.addRequestResponsePair(queryKey, records);

Expand All @@ -273,13 +275,13 @@ module.exports.process = async function processAction(message, configuration) {
query.on('record', (record) => {
records.push(record);
})
.on('end', () => {
.on('end', async () => {
lookupCache.addRequestResponsePair(queryKey, records);
this.emit('data', messages.newMessageWithBody({ results: records }));
await this.emit('data', messages.newMessageWithBody({ results: records }));
this.logger.info(`Got ${query.totalFetched} records`);
});
}

this.logger.info('Sending the request to SalesForce...');
return query.execute({ autoFetch: true, maxFetch: limit });
query.execute({ autoFetch: true, maxFetch: limit });
};
6 changes: 3 additions & 3 deletions lib/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ function addRetryCountInterceptorToAxios(ax) {
return Promise.reject(err);
}
config.currentRetryCount += 1;
return new Promise((resolve) => setTimeout(() => resolve(ax(config)), config.delay));
return new Promise(resolve => setTimeout(() => resolve(ax(config)), config.delay));
});
}


module.exports.base64Encode = (value) => Buffer.from(value).toString('base64');
module.exports.base64Decode = (value) => Buffer.from(value, 'base64').toString('utf-8');
module.exports.base64Encode = value => Buffer.from(value).toString('base64');
module.exports.base64Decode = value => Buffer.from(value, 'base64').toString('utf-8');
module.exports.createSignedUrl = async () => client.resources.storage.createSignedUrl();
module.exports.uploadAttachment = async (url, payload) => {
const ax = axios.create();
Expand Down
37 changes: 31 additions & 6 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.