Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
## 1.3.7 (October 30, 2020)

Upgrade to sailor 2.6.18

## 1.3.6 (October 8, 2020)

Component got deprecated. Use https://github.com/elasticio/salesforce-component-v2 instead

## 1.3.5 (August 21, 2020)
Expand Down
8 changes: 4 additions & 4 deletions lib/actions/bulk_cud.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const { messages } = require('elasticio-node');
const { Readable } = require('stream');
const util = require('../util');

const { Readable } = require('stream');

const MetaLoader = require('../helpers/metaLoader');
const sfConnection = require('../helpers/sfConnection.js');
Expand Down Expand Up @@ -30,7 +30,7 @@ exports.objectTypes = async function objectTypes(configuration) {


exports.process = async function bulkCUD(message, configuration) {
this.logger.debug('Starting:', configuration.operation);
this.logger.info('Starting bulkCUD');

const conn = sfConnection.createConnection(configuration, this);

Expand All @@ -49,7 +49,7 @@ exports.process = async function bulkCUD(message, configuration) {
timeout = DEFAULT_TIMEOUT;
}

let result = await util.downloadAttachment(message.attachments[key].url);
const result = await util.downloadAttachment(message.attachments[key].url);

const csvStream = new Readable();
csvStream.push(result);
Expand Down Expand Up @@ -78,7 +78,7 @@ exports.process = async function bulkCUD(message, configuration) {
out.body = { result: rets };
resolve(out);
}).on('error', (err) => {
this.logger.error('Job error', err.message);
this.logger.error('Job error!');
reject(new Error(err.message));
throw Error(`Job error: ${err.message}`);
});
Expand Down
4 changes: 2 additions & 2 deletions lib/actions/bulk_q.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ exports.process = async function bulkQuery(message, configuration) {
// Bulk operation ('query')
const stream = conn.bulk.query(message.body.query)
.on('error', (err) => {
this.logger.debug('error query:', err);
this.logger.warn('Bulk query error');
reject(err);
})
.stream();

// upload csv attachment
stream.pipe(request.put(signedUrl.put_url, (err, resp, body) => {
if (err) {
this.logger.debug('error upload:', err);
this.logger.warn('Stream.pipe upload error');
reject(err);
} else {
this.logger.debug('success');
Expand Down
8 changes: 4 additions & 4 deletions lib/actions/createObject.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ exports.objectTypes = function objectTypes(configuration) {
return metaLoader.getCreateableObjectTypes();
};

// eslint-disable-next-line consistent-return
exports.process = async function createObject(message, configuration) {
this.logger.info(`Preparing to create a ${configuration.sobject} object...`);

const sfConn = sfConnection.createConnection(configuration, this);

this.logger.debug('Creating message body: ', message.body);
this.logger.debug('Creating message body');

const binaryField = await attachment.prepareBinaryData(message, configuration, sfConn, this);

Expand All @@ -24,8 +25,7 @@ exports.process = async function createObject(message, configuration) {
try {
const response = await sfConn.sobject(configuration.sobject).create(message.body);

this.logger.debug('SF response: ', response);
this.logger.info(`${configuration.sobject} has been successfully created (ID = ${response.id}).`);
this.logger.info(`${configuration.sobject} has been successfully created.`);
// eslint-disable-next-line no-param-reassign
message.body.id = response.id;

Expand All @@ -36,7 +36,7 @@ exports.process = async function createObject(message, configuration) {

return messages.newMessageWithBody(message.body);
} catch (err) {
return this.emit('error', err);
await this.emit('error', err);
}
};

Expand Down
11 changes: 5 additions & 6 deletions lib/actions/deleteObject.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ module.exports.objectTypes = function objectTypes(configuration) {
};

module.exports.getMetaModel = async function getMetaModel(configuration) {
this.logger.debug(`Get MetaModel is called with config ${JSON.stringify(configuration)}`);
this.logger.debug('Get MetaModel is called');
configuration.metaType = 'lookup';
const metaLoader = new MetaLoader(configuration, this);
const metaData = await metaLoader.loadMetadata();
Expand Down Expand Up @@ -72,11 +72,11 @@ module.exports.process = async function process(message, configuration) {
try {
response = await sfConn.sobject(configuration.sobject).delete(message.body.id);
} catch (err) {
this.logger.error(`Salesforce error. ${err.message}`);
this.logger.error('Salesforce error');
return messages.newEmptyMessage();
}

this.logger.debug(`${configuration.sobject} has been successfully deleted (ID = ${response.id}).`);
this.logger.debug(`${configuration.sobject} has been successfully deleted`);
return messages.newMessageWithBody({ response });
}

Expand Down Expand Up @@ -104,13 +104,12 @@ module.exports.process = async function process(message, configuration) {
await helpers.deleteObjById.call(this, sfConn, res[0].Id, configuration.sobject);
} else {
const err = new Error('More than one object found, can only delete 1');
this.logger.error(err);
this.logger.trace(`Here are the objects found ${JSON.stringify(res)}`);
this.logger.error('More than one object found, can only delete 1');
await this.emit('error', err);
}
})
.on('error', async (err) => {
this.logger.error(err);
this.logger.error('Delete object failed');
await this.emit('error', err);
})
.run({ autoFetch: true, maxFetch: 2 });
Expand Down
10 changes: 5 additions & 5 deletions lib/actions/lookup.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ module.exports.getLookupFieldsModel = async function getLookupFieldsModel(config
*/
module.exports.process = async function processAction(message, configuration) {
const batchSize = configuration.batchSize || 0;
this.logger.info('batchSize', batchSize);
this.logger.debug('batchSize', batchSize);
const res = [];
const conn = new jsforce.Connection({
oauth2: {
Expand All @@ -45,7 +45,7 @@ module.exports.process = async function processAction(message, configuration) {
});

conn.on('refresh', (accessToken, refreshResult) => {
this.logger.trace('Keys were updated, res=%j', refreshResult);
this.logger.trace('Keys were updated');
this.emit('updateKeys', { oauth: refreshResult });
});

Expand All @@ -64,18 +64,18 @@ module.exports.process = async function processAction(message, configuration) {
if (batchSize > 0) {
while (res.length) {
const result = res.splice(0, batchSize);
this.logger.debug('emitting batch %j', { result });
this.logger.debug('emitting batch');
this.emit('data', messages.newMessageWithBody({ result }));
}
} else {
res.forEach((record) => {
this.logger.debug('emitting record %j', record);
this.logger.debug('emitting record');
this.emit('data', messages.newMessageWithBody(record));
});
}
})
.on('error', (err) => {
this.logger.error(err);
this.logger.error('Lookup failed');
this.emit('error', err);
})
.execute({ autoFetch: true, maxFetch });
Expand Down
15 changes: 7 additions & 8 deletions lib/actions/lookupObject.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ module.exports.process = async function processAction(message, configuration) {
return;
}
const err = new Error('No unique criteria provided');
this.logger.error(err);
this.logger.error('No unique criteria provided');
this.emit('error', err);
return;
}
Expand All @@ -75,12 +75,11 @@ module.exports.process = async function processAction(message, configuration) {

lookupCache.useCache(configuration.enableCacheUsage);
const queryKey = lookupCache.generateKeyFromDataArray(configuration.sobject, condition);
this.logger.trace(`Current request key hash: "${queryKey}"`);
if (lookupCache.hasKey(queryKey)) {
this.logger.info('Cached response found!');
const response = lookupCache.getResponse(queryKey);
// eslint-disable-next-line consistent-return
return this.emit('data', messages.newMessageWithBody(response));
return messages.newMessageWithBody(response);
}

// the query for the object and all its linked parent objects
Expand All @@ -106,7 +105,7 @@ module.exports.process = async function processAction(message, configuration) {

query = query.where(condition)
.on('error', (err) => {
this.logger.error(err);
this.logger.error('Salesforce lookup error');
if (err.message === 'Binary fields cannot be selected in join queries') {
// eslint-disable-next-line no-param-reassign
err.message = 'Binary fields cannot be selected in join queries. '
Expand All @@ -127,7 +126,7 @@ module.exports.process = async function processAction(message, configuration) {
this.emit('data', messages.newMessageWithBody({}));
} else {
const err = new Error('No objects found');
this.logger.error(err);
this.logger.error('Lookup cache failed');
this.emit('error', err);
}
} else if (res.length === 1) {
Expand All @@ -142,15 +141,15 @@ module.exports.process = async function processAction(message, configuration) {
}

lookupCache.addRequestResponsePair(queryKey, res[0]);
this.logger.debug('emitting record %j', outputMessage);
this.logger.debug('emitting record');
this.emit('data', outputMessage);
} catch (err) {
this.logger.error(err);
this.logger.error('Emitting response failed');
this.emit('error', err);
}
} else {
const err = new Error('More than one object found');
this.logger.error(err);
this.logger.error('More than one object found');
this.emit('error', err);
}
});
Expand Down
10 changes: 5 additions & 5 deletions lib/actions/lookupObjects.js
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ module.exports.process = async function processAction(message, configuration) {
this.logger.info('Building a query...');

const wherePart = await getWherePart(message, configuration, sfConn);
this.logger.debug('Where part: ', wherePart);

let limit;
let offset;
Expand Down Expand Up @@ -223,23 +222,24 @@ module.exports.process = async function processAction(message, configuration) {
lookupCache.useCache(configuration.enableCacheUsage);
const queryKey = lookupCache.generateKeyFromDataArray(configuration.sobject, wherePart,
offset, limit, configuration.includeDeleted);
this.logger.trace(`Current request key hash: "${queryKey}"`);
this.logger.trace('Request key cache found.');
if (lookupCache.hasKey(queryKey)) {
this.logger.info('Cached response found!');
const responseArray = lookupCache.getResponse(queryKey);
if (configuration.outputMethod === 'emitIndividually') {
if (responseArray.length === 0) {
return this.emit('data', messages.newMessageWithBody({ results: [] }));
return messages.newMessageWithBody({ results: [] });
}

for (let i = 0; i < responseArray.length; i += 1) {
// eslint-disable-next-line no-await-in-loop
await this.emit('data', messages.newMessageWithBody({ results: [responseArray[i]] }));
}
return true;
// eslint-disable-next-line consistent-return
return;
}

return this.emit('data', messages.newMessageWithBody({ results: responseArray }));
return messages.newMessageWithBody({ results: responseArray });
}

const records = [];
Expand Down
31 changes: 14 additions & 17 deletions lib/actions/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ function getConnection(configuration) {
version: common.globalConsts.SALESFORCE_API_VERSION,
});
conn.on('refresh', (accessToken, res) => {
this.logger.info('Keys were updated, res=%j', res);
this.logger.info('Keys were updated');
this.emit('updateKeys', { oauth: res });
});
return conn;
Expand All @@ -35,7 +35,7 @@ async function emitBatch(message, configuration) {
.on('record', (record) => {
batch.push(record);
if (batch.length >= configuration.batchSize) {
logger.info('Ready batch: %j', batch);
logger.info('Ready batch');
promises.push(self.emit('data', messages.newMessageWithBody({ result: batch })));
batch = [];
}
Expand All @@ -45,15 +45,15 @@ async function emitBatch(message, configuration) {
promises.push(self.emit('data', messages.newMessageWithBody({})));
}
if (batch.length > 0) {
logger.info('Last batch: %j', batch);
logger.info('Emitting batch');
promises.push(self.emit('data', messages.newMessageWithBody({ result: batch })));
}
logger.info('Total in database=%s', response.totalSize);
logger.info('Total fetched=%s', response.totalFetched);
logger.debug('Total in database=%s', response.totalSize);
logger.debug('Total fetched=%s', response.totalFetched);
resolve();
})
.on('error', (err) => {
logger.error(err);
logger.error('Emit data failed');
promises.push(self.emit('error', err));
reject(err);
})
Expand All @@ -72,19 +72,19 @@ async function emitIndividually(message, configuration) {
const response = connection.query(message.body.query)
.scanAll(configuration.includeDeleted)
.on('record', (record) => {
logger.info('Emitting record: %j', record);
logger.info('Emitting record');
promises.push(self.emit('data', messages.newMessageWithBody(record)));
})
.on('end', () => {
if (response.totalFetched === 0) {
promises.push(self.emit('data', messages.newMessageWithBody({})));
}
logger.info('Total in database=%s', response.totalSize);
logger.info('Total fetched=%s', response.totalFetched);
logger.debug('Total in database=%s', response.totalSize);
logger.debug('Total fetched=%s', response.totalFetched);
resolve();
})
.on('error', (err) => {
logger.error(err);
logger.error('Emit data failed');
promises.push(self.emit('error', err));
reject(err);
})
Expand All @@ -105,9 +105,8 @@ async function emitAll(message, configuration) {
result.push(record);
})
.on('end', () => {
logger.info('Result: %j', result);
logger.info('Total in database=%s', response.totalSize);
logger.info('Total fetched=%s', response.totalFetched);
logger.debug('Total in database=%s', response.totalSize);
logger.debug('Total fetched=%s', response.totalFetched);
if (response.totalFetched === 0) {
resolve(self.emit('data', messages.newMessageWithBody({})));
}
Expand All @@ -116,22 +115,20 @@ async function emitAll(message, configuration) {
}
})
.on('error', (err) => {
logger.error(err);
logger.error('Emit data failed');
reject(self.emit('error', err));
});
});
}

exports.process = async function processAction(message, configuration) {
const { logger } = this;
logger.trace('Input configuration: %j', configuration);
logger.trace('Input message: %j', message);
const batchSize = configuration.batchSize || 0;
// eslint-disable-next-line no-restricted-globals
if (isNaN(batchSize)) {
throw new Error('batchSize must be a number');
}
logger.info('Starting SOQL Select batchSize=%s query=%s', batchSize, message.body.query);
logger.info('Starting SOQL Select Query with batchSize=%s', batchSize);
if (configuration.allowResultAsSet) {
logger.info('Selected EmitAllHandler');
await emitAll.call(this, message, configuration);
Expand Down
6 changes: 2 additions & 4 deletions lib/actions/upsert.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ module.exports.process = async function upsertObject(message, configuration) {
await attachment.prepareBinaryData(message, configuration, conn, this);

if (message.body.Id) {
this.logger.info('Upserting sobject=%s by internalId', configuration.sobject, message.body.Id);
this.logger.debug('Upserting %s by internalId data: %j', configuration.sobject, message.body.Id, message);
this.logger.info('Upserting sobject=%s by internalId', configuration.sobject);
return conn.sobject(configuration.sobject).update(message.body)
.then(() => {
// eslint-disable-next-line no-param-reassign
Expand All @@ -54,8 +53,7 @@ module.exports.process = async function upsertObject(message, configuration) {
});
}

this.logger.info('Upserting sobject: %s by externalId: %s', configuration.sobject, configuration.extIdField);
this.logger.debug('Upserting sobject: %s by externalId:%s data: %j', configuration.sobject, configuration.extIdField, message);
this.logger.info('Upserting sobject: %s by externalId', configuration.sobject);

if (!configuration.extIdField) {
throw Error('Can not find internalId/externalId ids');
Expand Down
Loading