Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

salesforce Add support for asyncMode in bulk operation #907

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/moody-crabs-fly.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/language-salesforce': minor
---

Add `asyncMode` option in `bulk()` function
4 changes: 4 additions & 0 deletions packages/salesforce/ast.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@
"title": "state",
"description": "{SalesforceResultState}"
},
{
"title": "state",
"description": "data.batches - Array of batch Infomation"
},
{
"title": "returns",
"description": null,
Expand Down
67 changes: 44 additions & 23 deletions packages/salesforce/src/Adaptor.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import flatten from 'lodash/flatten';
* @property {boolean} [failOnError=false] - Fail the operation on error.
* @property {integer} [pollTimeout=240000] - Polling timeout in milliseconds.
* @property {integer} [pollInterval=6000] - Polling interval in milliseconds.
* @property {boolean} [asynMode=false] - Use asyn mode.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo

*/

/**
Expand Down Expand Up @@ -159,6 +160,7 @@ export function execute(...operations) {
* @param {array} records - an array of records, or a function which returns an array.
* @param {BulkOptions} [options] - Options to configure the request. In addition to these, you can pass any of the options supported by the {@link https://bit.ly/41tyvVU jsforce API}.
* @state {SalesforceResultState}
* @state data.batches - Array of batch Infomation
* @returns {Operation}
*/
export function bulk(sObjectName, operation, records, options = {}) {
Expand All @@ -177,6 +179,7 @@ export function bulk(sObjectName, operation, records, options = {}) {
allowNoOp = false,
pollTimeout = 240000,
pollInterval = 6000,
asyncMode = false,
} = resolvedOptions;

const flatRecords = util.removeNestings(resolvedRecords);
Expand Down Expand Up @@ -220,36 +223,54 @@ export function bulk(sObjectName, operation, records, options = {}) {
reject(err);
});

return batch
.on('queue', function (batchInfo) {
const batchId = batchInfo.id;
var batch = job.batch(batchId);
batch.poll(pollInterval, pollTimeout);
})
.then(async res => {
if (asyncMode) {
return batch.on('queue', async function (batchInfo) {
console.info('Batch queued. Batch ID:', batchInfo.id);
resolve({ batchInfo });
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A promise can only resolve once. If there are multiple queue events, for multiple chunks, then they'll never report back to user code. In other words, we'll only ever report back the first batch here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I have tried running the test with a batch size of 1 and i got the expected results.
Can you tell me more about this ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tested multiple batches by reducing the batch size to 1 and run integration test that insert two records with the option asynMode: true, I can see that i am getting two batches records after the bulk operation is done.

await job.close();
const errors = res
.map((r, i) => ({ ...r, position: i + 1 }))
.filter(item => {
return !item.success;
});
} else {
return batch
.on('queue', function (batchInfo) {
const batchId = batchInfo.id;
var batch = job.batch(batchId);
batch.poll(pollInterval, pollTimeout);
})
.then(async res => {
await job.close();
const errors = res
.map((r, i) => ({ ...r, position: i + 1 }))
.filter(item => {
return !item.success;
});

errors.forEach(err => {
err[`${resolvedOptions.extIdField}`] =
chunkedBatch[err.position - 1][
resolvedOptions.extIdField
];
});

errors.forEach(err => {
err[`${resolvedOptions.extIdField}`] =
chunkedBatch[err.position - 1][resolvedOptions.extIdField];
if (failOnError && errors.length > 0) {
console.error('Errors detected:');
reject(JSON.stringify(errors, null, 2));
} else {
console.log('Result : ' + JSON.stringify(res, null, 2));
resolve(res);
}
});

if (failOnError && errors.length > 0) {
console.error('Errors detected:');
reject(JSON.stringify(errors, null, 2));
} else {
console.log('Result : ' + JSON.stringify(res, null, 2));
resolve(res);
}
});
}
})
)
).then(results => {
// Handle batch results
if (asyncMode && results.some(obj => 'batchInfo' in obj)) {
return composeNextState(state, {
success: true,
batches: results.map(r => r.batchInfo).filter(Boolean),
errors: [],
});
}
const allResults = util.formatResults(results.flat());
console.log('Merging results arrays.');
return composeNextState(state, allResults);
Expand Down
8 changes: 8 additions & 0 deletions packages/salesforce/test/integration.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ describe('Integration tests', () => {
before(async () => {
state.data = [{ name: 'Coco', vera__Active__c: 'No' }];
});
it('should return batchInfo if asyncMode: true', async () => {
const { data } = await execute(
bulk('Account', 'insert', state => state.data, { asyncMode: true })
)(state);

expect(data.batches.length).to.eql(1);
expect(data.success).to.eql(true);
}).timeout(10000);
it('should create multiple sobjects', async () => {
const { data } = await execute(
bulk('Account', 'insert', state => state.data)
Expand Down
Loading