diff --git a/src/cron/pullCustomSiloOutstandingIdentifiers.ts b/src/cron/pullCustomSiloOutstandingIdentifiers.ts index 947163ff..35e190d1 100644 --- a/src/cron/pullCustomSiloOutstandingIdentifiers.ts +++ b/src/cron/pullCustomSiloOutstandingIdentifiers.ts @@ -13,12 +13,22 @@ import { RequestAction } from '@transcend-io/privacy-types'; import { logger } from '../logger'; import { DEFAULT_TRANSCEND_API } from '../constants'; import { mapSeries } from 'bluebird'; +// FIXME +// import groupBy from 'lodash/groupBy'; export interface CronIdentifierWithAction extends CronIdentifier { /** The request action that the identifier relates to */ action: RequestAction; } +/** + * Cron identifier mode to pull + */ +export enum PullCronIdentifiersMode { + PerRequest = 'PER_REQUEST', + PerIdentifier = 'PER_IDENTIFIER', +} + /** * Pull the set of identifiers outstanding for a cron or AVC integration * @@ -29,6 +39,7 @@ export async function pullCustomSiloOutstandingIdentifiers({ auth, sombraAuth, actions, + format = PullCronIdentifiersMode.PerRequest, pageLimit = 100, transcendUrl = DEFAULT_TRANSCEND_API, }: { @@ -38,6 +49,8 @@ export async function pullCustomSiloOutstandingIdentifiers({ dataSiloId: string; /** The request actions to fetch */ actions: RequestAction[]; + /** The format */ + format?: PullCronIdentifiersMode; /** Page limit when fetching identifiers */ pageLimit?: number; /** API URL for Transcend backend */ @@ -126,6 +139,22 @@ export async function pullCustomSiloOutstandingIdentifiers({ ); // Write out to CSV + // FIXME + // const data = + // format === PullCronIdentifiersMode.PerRequest + // ? identifiers.map(({ attributes, ...identifier }) => ({ + // ...identifier, + // ...attributes.reduce( + // (acc, val) => + // Object.assign(acc, { + // [val.key]: val.values.join(','), + // }), + // {}, + // ), + // })) + // : // FIXME + // Object.entries(groupBy(identifiers, 'requestId')); + const data = identifiers.map(({ attributes, ...identifier }) => ({ ...identifier, ...attributes.reduce( diff --git a/src/requests/constants.ts b/src/requests/constants.ts index 5720dbaf..b93c5ded 100644 --- a/src/requests/constants.ts +++ b/src/requests/constants.ts @@ -7,6 +7,7 @@ import { IsoCountrySubdivisionCode, } from '@transcend-io/privacy-types'; import * as t from 'io-ts'; +import { PrivacyRequestInput } from './mapCsvRowsToRequestInputs'; export const NONE = '[NONE]' as const; export const BULK_APPLY = '[APPLY VALUE TO ALL ROWS]' as const; @@ -113,6 +114,14 @@ export type SuccessfulRequest = t.TypeOf; export const CachedRequestState = t.type({ /** Set of privacy requests that failed to upload */ failingRequests: t.array(t.record(t.string, t.any)), + // FIXME partial + /** Set of privacy requests that are pending being uploaded */ + pendingRequests: t.array( + t.type({ + rawRow: t.record(t.string, t.any), + requestInput: PrivacyRequestInput, + }), + ), /** Successfully uploaded requests */ successfulRequests: t.array(SuccessfulRequest), /** Duplicate requests */ diff --git a/src/requests/uploadPrivacyRequestsFromCsv.ts b/src/requests/uploadPrivacyRequestsFromCsv.ts index 80fee0f2..a980ffc1 100644 --- a/src/requests/uploadPrivacyRequestsFromCsv.ts +++ b/src/requests/uploadPrivacyRequestsFromCsv.ts @@ -106,23 +106,6 @@ export async function uploadPrivacyRequestsFromCsv({ regionToCountry: {}, }); - // Create a new state file to store the requests from this run - const requestCacheFile = join( - requestReceiptFolder, - `tr-request-upload-${new Date().toISOString()}-${file - .split('/') - .pop()}`.replace('.csv', '.json'), - ); - const requestState = new PersistedState( - requestCacheFile, - CachedRequestState, - { - successfulRequests: [], - duplicateRequests: [], - failingRequests: [], - }, - ); - // Create sombra instance to communicate with const sombra = await createSombraGotInstance(transcendUrl, auth, sombraAuth); @@ -179,11 +162,33 @@ export async function uploadPrivacyRequestsFromCsv({ requestAttributeKeys, }); + // Create a new state file to store the requests from this run + const requestCacheFile = join( + requestReceiptFolder, + `tr-request-upload-${new Date().toISOString()}-${file + .split('/') + .pop()}`.replace('.csv', '.json'), + ); + const requestState = new PersistedState( + requestCacheFile, + CachedRequestState, + { + successfulRequests: [], + duplicateRequests: [], + failingRequests: [], + pendingRequests: requestInputs.map(([rawRow, requestInput]) => ({ + rawRow, + requestInput, + })), + }, + ); + // start the progress bar with a total value of 200 and start value of 0 if (!debug) { progressBar.start(requestInputs.length, 0); } let total = 0; + // Submit each request await map( requestInputs, @@ -251,6 +256,7 @@ export async function uploadPrivacyRequestsFromCsv({ } // Cache successful upload + const pendingRequests = requestState.getValue('pendingRequests'); const successfulRequests = requestState.getValue('successfulRequests'); successfulRequests.push({ id: requestResponse.id, @@ -260,6 +266,14 @@ export async function uploadPrivacyRequestsFromCsv({ attemptedAt: new Date().toISOString(), }); requestState.setValue(successfulRequests, 'successfulRequests'); + requestState.setValue( + pendingRequests.filter( + (request) => + request.requestInput.coreIdentifier !== + requestResponse.coreIdentifier, + ), + 'pendingRequests', + ); } catch (err) { const msg = `${err.message} - ${JSON.stringify( err.response?.body, @@ -280,6 +294,7 @@ export async function uploadPrivacyRequestsFromCsv({ ), ); } + const pendingRequests = requestState.getValue('pendingRequests'); const duplicateRequests = requestState.getValue('duplicateRequests'); duplicateRequests.push({ coreIdentifier: requestInput.coreIdentifier, @@ -287,6 +302,14 @@ export async function uploadPrivacyRequestsFromCsv({ attemptedAt: new Date().toISOString(), }); requestState.setValue(duplicateRequests, 'duplicateRequests'); + requestState.setValue( + pendingRequests.filter( + (request) => + request.requestInput.coreIdentifier !== + requestInput.coreIdentifier, + ), + 'pendingRequests', + ); } else { const failingRequests = requestState.getValue('failingRequests'); failingRequests.push({