Skip to content

Commit

Permalink
feat!: convert stream error responses
Browse files Browse the repository at this point in the history
Reject with a parsed JSON object for streaming response cases.

BREAKING CHANGE: Error responses from *AsStream endpoints now
return a JSON object on the rejected promise instead of the raw
stream. Successful responses continue to provide a stream result.
  • Loading branch information
ricellis committed Nov 6, 2024
1 parent 84f762c commit 32e0c25
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 78 deletions.
4 changes: 2 additions & 2 deletions .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"files": "stubs/.+\\.json|package-lock.json|^.secrets.baseline$",
"lines": null
},
"generated_at": "2024-09-18T15:57:23Z",
"generated_at": "2024-11-04T12:08:10Z",
"plugins_used": [
{
"name": "AWSKeyDetector"
Expand Down Expand Up @@ -156,7 +156,7 @@
"hashed_secret": "fc5177639d71196391dd9f4997bc4f240eb29366",
"is_secret": false,
"is_verified": false,
"line_number": 165,
"line_number": 185,
"type": "Secret Keyword",
"verified_result": null
}
Expand Down
20 changes: 14 additions & 6 deletions lib/cloudantBaseService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import { CookieJar } from 'tough-cookie';
import { Agent as HttpsAgent } from 'node:https';
import { Agent as HttpAgent } from 'node:http';
import { CouchdbSessionAuthenticator } from '../auth';
import { errorResponseInterceptor } from './errorResponseInterceptor';
import {
errorResponseInterceptor,
errorResponseStreamConverter,
} from './errorResponseInterceptor';
import { getSdkHeaders } from './common';

/**
Expand Down Expand Up @@ -115,9 +118,17 @@ export default abstract class CloudantBaseService extends BaseService {
this.configureSessionAuthenticator();

// Add response interceptor for error transforms
this.addErrorTransformers();
}

private addErrorTransformers() {
this.getHttpClient().interceptors.response.use(
(response) => response,
errorResponseStreamConverter
);
this.getHttpClient().interceptors.response.use(
(response) => response,
(axiosError) => errorResponseInterceptor(axiosError)
errorResponseInterceptor
);
}

Expand Down Expand Up @@ -150,10 +161,7 @@ export default abstract class CloudantBaseService extends BaseService {
super.configureService(serviceName);
this.configureSessionAuthenticator();
// Add response interceptor for error transforms
this.getHttpClient().interceptors.response.use(
(response) => response,
(axiosError) => errorResponseInterceptor(axiosError)
);
this.addErrorTransformers();
}

/**
Expand Down
63 changes: 61 additions & 2 deletions lib/errorResponseInterceptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,25 @@
* limitations under the License.
*/

const { pipeline } = require('node:stream/promises');
const { Writable, Readable } = require('node:stream');

export function errorResponseInterceptor(axiosError) {
if (
axiosError.response && // must have a response
axiosError.response.status >= 400 &&
// must have data:
axiosError.response.data &&
// must be a JSON
// must be a JSON request
// which also implies Content-Type starts with application/json:
axiosError.config.responseType === 'json' &&
(axiosError.config.responseType === 'json' ||
// or a stream request with application/json
// that has had the response converted to an object
(axiosError.config.responseType === 'stream' &&
axiosError.response.headers['content-type'].startsWith(
'application/json'
) &&
!(axiosError.response.data instanceof Readable))) &&
// must be a valid JSON:
// which also implies it is not a HEAD method:
axiosError.response.data instanceof Object &&
Expand Down Expand Up @@ -53,3 +63,52 @@ export function errorResponseInterceptor(axiosError) {

return Promise.reject(axiosError);
}

/**
* Axios interceptor to convert errors for streaming cases into
* JSON objects.
*
* This means users can handle rejections for AsStream cases in
* the same way as normal error rejections and we can apply the
* errorResponseInterceptor to augment the errors.
*
* @param axiosError
* @returns rejected promise with the stream body transoformed
*/
export async function errorResponseStreamConverter(axiosError) {
if (
axiosError.response && // must have a response
axiosError.response.status >= 400 &&
// must have data:
axiosError.response.data &&
// Must be a JSON response
axiosError.response.headers['content-type']?.startsWith(
'application/json'
) &&
// this is for streaming requests
axiosError.config.responseType === 'stream' &&
// must be a stream
axiosError.response.data instanceof Readable
) {
let data = '';
await pipeline(
axiosError.response.data,
new Writable({
write: (c, e, cb) => {
data += c;
cb();
},
})
);
try {
axiosError.response.data = JSON.parse(data);
} catch (e) {
// JSON parse failure, just use the original
// error stream as a string, matching axios behavior
// for broken JSON
axiosError.response.data = data;
}
}

return Promise.reject(axiosError);
}
112 changes: 44 additions & 68 deletions test/unit/errorResponseInterceptor.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@

const assert = require('assert');
const nock = require('nock');
const { Writable } = require('node:stream');
const { pipeline } = require('node:stream/promises');
const { IncomingMessage } = require('http');
const { getClient } = require('./features/testDataProviders.js');
const {
errorResponseInterceptor,
errorResponseStreamConverter,
} = require('../../lib/errorResponseInterceptor.ts');

// Constants used by most of the test cases:
Expand All @@ -34,6 +32,13 @@ const ERROR_CODE = 459; // not a common error code because nock is able to fail

let service;
const numberOfBaseInterceptors = 1; // responseInterceptor from core exists already
const numberOfErrorInterceptors = 2; // errorResponseStreamConverter and errorResponseInterceptor
const expectedNumberOfResponseInterceptors =
numberOfBaseInterceptors + numberOfErrorInterceptors; // total expected interceptors
// Indexes of interceptors
const firstErrorInterceptorIndex =
expectedNumberOfResponseInterceptors - numberOfErrorInterceptors;
const secondErrorInterceptorIndex = firstErrorInterceptorIndex + 1;

function getCases() {
return [
Expand Down Expand Up @@ -66,11 +71,15 @@ function getCases() {
},
},
{
name: 'test_no_augment_stream',
name: 'test_augment_stream',
requestFunction: 'getDocumentAsStream',
mockResponse: { error: 'foo', reason: 'Bar.' },
// for now in Node we don't augment stream responses:
expectedResponse: { error: 'foo', reason: 'Bar.' },
expectedResponse: {
error: 'foo',
reason: 'Bar.',
errors: [{ code: 'foo', message: 'foo: Bar.' }],
trace: TRACE,
},
mockHeaders: { 'x-couch-request-id': TRACE },
expectedHeaders: {
'x-couch-request-id': TRACE,
Expand Down Expand Up @@ -279,22 +288,6 @@ function getCases() {
];
}

// Take the result stream from an error
// and convert it to a JSON object
async function getJsonErrorFromStreamError(streamError) {
let data = '';
await pipeline(
streamError.result,
new Writable({
write: (c, e, cb) => {
data += c;
cb();
},
})
);
return JSON.parse(data);
}

describe('test errorResponseInterceptor', () => {
beforeEach(() => {
nock.cleanAll();
Expand Down Expand Up @@ -334,12 +327,7 @@ describe('test errorResponseInterceptor', () => {
if (onrejected.headers) {
responseHeaders = onrejected.headers.toJSON();
}
// Get JSON when result is a stream:
if (onrejected.result instanceof IncomingMessage) {
responseResult = await getJsonErrorFromStreamError(onrejected);
} else {
responseResult = onrejected.result;
}
responseResult = onrejected.result;
responseMessage = onrejected.message;
responseStack = onrejected.stack;
responseIsSaved = true;
Expand All @@ -362,58 +350,46 @@ describe('test errorResponseInterceptor', () => {
}
});

it('test_added', async () => {
const expectedNumberOfResponseInterceptors = numberOfBaseInterceptors + 1;
/**
* Assert the error interceptors are present and correctly ordered.
*/
function assertInterceptors() {
assert.strictEqual(
service.requestWrapperInstance.axiosInstance.interceptors.response
.handlers.length,
expectedNumberOfResponseInterceptors
);
// check whether errorResponseInterceptor is set as a new interceptor for rejected responses
const actualErrorResponseInterceptor =
// check the 2 error response interceptors are set for rejected responses
// errorResponseStreamConverter and
// errorResponseInterceptor
// and ensure the stream converter is first
const actualFirstRejectionInterceptor =
service.requestWrapperInstance.axiosInstance.interceptors.response
.handlers[expectedNumberOfResponseInterceptors - 1].rejected;
assert.deepStrictEqual(
Object.getPrototypeOf(actualErrorResponseInterceptor),
Object.getPrototypeOf(errorResponseInterceptor)
);
});
it('test_added_via_configureService', async () => {
// Before configureService:
let expectedNumberOfResponseInterceptors = numberOfBaseInterceptors + 1;
assert.strictEqual(
.handlers[firstErrorInterceptorIndex].rejected;
const actualSecondRejectionInterceptor =
service.requestWrapperInstance.axiosInstance.interceptors.response
.handlers.length,
expectedNumberOfResponseInterceptors
.handlers[secondErrorInterceptorIndex].rejected;
assert.strictEqual(
actualFirstRejectionInterceptor,
errorResponseStreamConverter
);
// check whether errorResponseInterceptor is set as a new interceptor for rejected responses
let actualErrorResponseInterceptor =
service.requestWrapperInstance.axiosInstance.interceptors.response
.handlers[expectedNumberOfResponseInterceptors - 1].rejected;
assert.deepStrictEqual(
Object.getPrototypeOf(actualErrorResponseInterceptor),
Object.getPrototypeOf(errorResponseInterceptor)
assert.strictEqual(
actualSecondRejectionInterceptor,
errorResponseInterceptor
);
// get number of service response interceptors before configureService
expectedNumberOfResponseInterceptors =
service.requestWrapperInstance.axiosInstance.interceptors.response
.handlers.length; // after configureService we should get the same number of interceptors
}

it('test_added', () => {
assertInterceptors();
});

it('test_added_via_configureService', () => {
// Before configureService:
assertInterceptors();

service.configureService('apple'); // configureService overrides the interceptors as well

// After configureService:
assert.strictEqual(
service.requestWrapperInstance.axiosInstance.interceptors.response
.handlers.length,
expectedNumberOfResponseInterceptors
);
// check whether errorResponseInterceptor is set as a new interceptor for rejected responses
actualErrorResponseInterceptor =
service.requestWrapperInstance.axiosInstance.interceptors.response
.handlers[expectedNumberOfResponseInterceptors - 1].rejected;
assert.deepStrictEqual(
Object.getPrototypeOf(actualErrorResponseInterceptor),
Object.getPrototypeOf(errorResponseInterceptor)
);
assertInterceptors();
});
});

0 comments on commit 32e0c25

Please sign in to comment.