Skip to content

Commit

Permalink
[SYNTH-13138] Wait for each ServerResult to be available (#1251)
Browse files Browse the repository at this point in the history
* [synthetics] Move non-exported utils to `batch.ts`

* [SYNTH-13138] Wait for each server result to be available

* Inline `getIncompleteResult()` function
  • Loading branch information
Drarig29 authored Mar 26, 2024
1 parent 74dc408 commit 2f73dbb
Show file tree
Hide file tree
Showing 2 changed files with 234 additions and 15 deletions.
141 changes: 139 additions & 2 deletions src/commands/synthetics/__tests__/utils/public.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,7 @@ describe('utils', () => {
describe('waitForResults', () => {
const batch: Batch = getBatch()
const apiTest = getApiTest(batch.results[0].test_public_id)
const incompleteServerResult = ({eventType: 'created'} as unknown) as ServerResult
const result: Result = {
executionRule: ExecutionRule.BLOCKING,
location: mockLocation.display_name,
Expand Down Expand Up @@ -992,6 +993,142 @@ describe('utils', () => {
expect(mockReporter.testsWait).toHaveBeenCalledTimes(4)
})

test('should wait for incomplete results', async () => {
jest.spyOn(utils, 'wait').mockImplementation(async () => waiter.resolve())

const tests = [result.test, {...result.test, public_id: 'other-public-id'}]

// First ('in_progress')
waiter.start()
mockApi({
getBatchImplementation: async () => ({
status: 'in_progress',
results: [
// First test
{...batch.results[0], status: 'in_progress'},
{...batch.results[0], status: 'passed', result_id: 'rid-2'},
// Second test
{...batch.results[0], status: 'in_progress', test_public_id: 'other-public-id', result_id: 'rid-3'},
] as ResultInBatch[],
}),
pollResultsImplementation: async () => [{...pollResult, resultID: 'rid-2', result: incompleteServerResult}],
})

const resultsPromise = utils.waitForResults(
api,
trigger,
tests,
{
datadogSite: DEFAULT_COMMAND_CONFIG.datadogSite,
failOnCriticalErrors: false,
maxPollingTimeout: 120000,
subdomain: DEFAULT_COMMAND_CONFIG.subdomain,
},
mockReporter
)

// Wait for the 2 tests (initial)
expect(mockReporter.testsWait).toHaveBeenNthCalledWith(1, [tests[0], tests[1]], MOCK_BASE_URL, trigger.batch_id)

await waiter.promise

// One result received
expect(mockReporter.resultReceived).toHaveBeenNthCalledWith(1, {
...batch.results[0],
status: 'passed',
result_id: 'rid-2',
})
// But the data from `/poll_results` data is not available yet, so we should wait more before reporting
expect(mockReporter.resultEnd).not.toHaveBeenCalled()
// Still waiting for 2 tests
expect(mockReporter.testsWait).toHaveBeenNthCalledWith(
2,
[tests[0], tests[1]],
MOCK_BASE_URL,
trigger.batch_id,
0
)

// Third ('in_progress')
waiter.start()
mockApi({
getBatchImplementation: async () => ({
status: 'in_progress',
results: [
// First test
{...batch.results[0], status: 'passed'},
{...batch.results[0], status: 'passed', result_id: 'rid-2'},
// Second test
{...batch.results[0], status: 'in_progress', test_public_id: 'other-public-id', result_id: 'rid-3'},
] as ResultInBatch[],
}),
pollResultsImplementation: async () => [
{...pollResult, result: incompleteServerResult}, // not available yet
deepExtend({}, pollResult, {resultID: 'rid-2'}), // just became available
],
})

await waiter.promise

// One result received
expect(mockReporter.resultReceived).toHaveBeenNthCalledWith(2, {
...batch.results[0],
status: 'passed',
})
// Result 2 just became available, so it should be reported
expect(mockReporter.resultEnd).toHaveBeenNthCalledWith(1, {...result, resultId: 'rid-2'}, MOCK_BASE_URL)
// Now waiting for 1 test
expect(mockReporter.testsWait).toHaveBeenNthCalledWith(3, [tests[1]], MOCK_BASE_URL, trigger.batch_id, 0)

// Last ('passed')
mockApi({
getBatchImplementation: async () => ({
status: 'passed',
results: [
// First test
{...batch.results[0], status: 'passed'},
{...batch.results[0], status: 'passed', result_id: 'rid-2'},
// Second test
{...batch.results[0], status: 'passed', test_public_id: 'other-public-id', result_id: 'rid-3'},
] as ResultInBatch[],
}),
pollResultsImplementation: async () => [
{...pollResult, result: incompleteServerResult}, // still not available
deepExtend({}, pollResult, {resultID: 'rid-2'}),
deepExtend({}, pollResult, {resultID: 'rid-3'}),
],
})

expect(await resultsPromise).toEqual([
{...result, resultId: 'rid', result: incompleteServerResult},
{...result, resultId: 'rid-2'},
{...result, resultId: 'rid-3'},
])

// One result received
expect(mockReporter.resultReceived).toHaveBeenNthCalledWith(3, {
...batch.results[0],
status: 'passed',
test_public_id: 'other-public-id',
result_id: 'rid-3',
})
// Result 3 was available instantly
expect(mockReporter.resultEnd).toHaveBeenNthCalledWith(2, {...result, resultId: 'rid-3'}, MOCK_BASE_URL)

// Result 1 never became available
expect(mockReporter.resultEnd).toHaveBeenNthCalledWith(
3,
{...result, resultId: 'rid', result: incompleteServerResult},
MOCK_BASE_URL
)
expect(mockReporter.log).toHaveBeenCalledWith(
'The full information for result rid was incomplete at the end of the batch.'
)

// Do not report when there are no tests to wait anymore
expect(mockReporter.testsWait).toHaveBeenCalledTimes(3)
})

test('object in each result should be different even if they share the same public ID (config overrides)', async () => {
mockApi({
getBatchImplementation: async () => ({
Expand Down Expand Up @@ -1071,8 +1208,8 @@ describe('utils', () => {
expect(mockReporter.resultEnd).toHaveBeenNthCalledWith(2, expectedTimeoutResult, MOCK_BASE_URL)
})

test('results failure should ignore if timed-out', async () => {
// The original failure of a result received between timing-out in batch poll
test('results failure should be ignored if timed out', async () => {
// The original failure of a result received between timing out in batch poll
// and retrieving it should be ignored in favor of timeout.
mockApi({
getBatchImplementation: async () => ({
Expand Down
108 changes: 95 additions & 13 deletions src/commands/synthetics/batch.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
import deepExtend from 'deep-extend'

import {APIHelper, EndpointError, formatBackendErrors} from './api'
import {Batch, MainReporter, PollResultMap, Result, ResultDisplayInfo, ResultInBatch, Test, Trigger} from './interfaces'
import {
BaseResultInBatch,
Batch,
MainReporter,
PollResultMap,
Result,
ResultDisplayInfo,
ResultInBatch,
Test,
Trigger,
} from './interfaces'
import {isResultInBatchSkippedBySelectiveRerun, getResultIdOrLinkedResultId} from './utils/internal'
import {wait, getAppBaseURL, hasResultPassed} from './utils/public'

Expand All @@ -16,6 +26,7 @@ export const waitForBatchToFinish = async (
): Promise<Result[]> => {
const maxPollingDate = Date.now() + maxPollingTimeout
const emittedResultIndexes = new Set<number>()
let oldIncompleteResultIds = new Set<string>()

while (true) {
const batch = await getBatch(api, trigger)
Expand All @@ -25,20 +36,31 @@ export const waitForBatchToFinish = async (
// But `hasBatchExceededMaxPollingDate` is a safety in case it fails to do that.
const shouldContinuePolling = batch.status === 'in_progress' && !hasBatchExceededMaxPollingDate

const receivedResults = reportReceivedResults(batch, emittedResultIndexes, reporter)
const residualResults = batch.results.filter((_, index) => !emittedResultIndexes.has(index))
const newlyReceivedResults = reportReceivedResults(batch, emittedResultIndexes, reporter)

// For the last iteration, the full up-to-date data has to be fetched to compute this function's return value,
// while only the [received + residual] results have to be reported.
const resultIdsToFetch = (shouldContinuePolling ? receivedResults : batch.results).flatMap((r) =>
isResultInBatchSkippedBySelectiveRerun(r) ? [] : [r.result_id]
const resultIdsToFetch = getResultIdsToFetch(
shouldContinuePolling,
batch,
newlyReceivedResults,
oldIncompleteResultIds
)
const resultsToReport = receivedResults.concat(shouldContinuePolling ? [] : residualResults)

const pollResultMap = await getPollResultMap(api, resultIdsToFetch)
const {pollResultMap, incompleteResultIds} = await getPollResultMap(api, resultIdsToFetch)

const resultsToReport = getResultsToReport(
shouldContinuePolling,
batch,
newlyReceivedResults,
emittedResultIndexes,
oldIncompleteResultIds,
incompleteResultIds,
reporter
)

reportResults(resultsToReport, pollResultMap, resultDisplayInfo, hasBatchExceededMaxPollingDate, reporter)

oldIncompleteResultIds = incompleteResultIds

if (!shouldContinuePolling) {
return batch.results.map((r) =>
getResultFromBatch(r, pollResultMap, resultDisplayInfo, hasBatchExceededMaxPollingDate)
Expand All @@ -51,6 +73,52 @@ export const waitForBatchToFinish = async (
}
}

const getResultIdsToFetch = (
shouldContinuePolling: boolean,
batch: Batch,
newlyReceivedResults: ResultInBatch[],
oldIncompleteResultIds: Set<string>
): string[] => {
// For the last iteration, the full up-to-date data has to be fetched to compute the return value of `waitForResults()`.
if (!shouldContinuePolling) {
return getResultIds(batch.results)
}

return getResultIds(newlyReceivedResults).concat(...oldIncompleteResultIds)
}

const getResultsToReport = (
shouldContinuePolling: boolean,
batch: Batch,
newlyReceivedResults: ResultInBatch[],
emittedResultIndexes: Set<number>,
oldIncompleteResultIds: Set<string>,
incompleteResultIds: Set<string>,
reporter: MainReporter
): ResultInBatch[] => {
const newlyCompleteResults = excludeSkipped(batch.results).filter(
(r) => oldIncompleteResultIds.has(r.result_id) && !incompleteResultIds.has(r.result_id)
)

const resultsToReport = newlyReceivedResults
.filter((r) => isResultInBatchSkippedBySelectiveRerun(r) || !incompleteResultIds.has(r.result_id))
.concat(newlyCompleteResults)

if (shouldContinuePolling) {
return resultsToReport
}

const residualResults = excludeSkipped(batch.results).filter(
(r, index) => !emittedResultIndexes.has(index) || incompleteResultIds.has(r.result_id)
)

for (const result of residualResults) {
reporter.log(`The full information for result ${result.result_id} was incomplete at the end of the batch.`)
}

return resultsToReport.concat(residualResults)
}

const reportReceivedResults = (batch: Batch, emittedResultIndexes: Set<number>, reporter: MainReporter) => {
const receivedResults: ResultInBatch[] = []

Expand Down Expand Up @@ -176,16 +244,30 @@ const getBatch = async (api: APIHelper, trigger: Trigger): Promise<Batch> => {
}
}

const getTestByPublicId = (id: string, tests: Test[]): Test => tests.find((t) => t.public_id === id)!

const getPollResultMap = async (api: APIHelper, resultIds: string[]) => {
try {
const pollResults = await api.pollResults(resultIds)

const pollResultMap: PollResultMap = {}
pollResults.forEach((r) => (pollResultMap[r.resultID] = r))
const incompleteResultIds = new Set<string>()

pollResults.forEach((r) => {
// When they are initialized in the backend, results only contain an `eventType: created` property.
if ('eventType' in r.result && r.result.eventType === 'created') {
incompleteResultIds.add(r.resultID)
}
pollResultMap[r.resultID] = r
})

return pollResultMap
return {pollResultMap, incompleteResultIds}
} catch (e) {
throw new EndpointError(`Failed to poll results: ${formatBackendErrors(e)}\n`, e.response?.status)
}
}

const getTestByPublicId = (id: string, tests: Test[]): Test => tests.find((t) => t.public_id === id)!

const getResultIds = (results: ResultInBatch[]): string[] => excludeSkipped(results).map((r) => r.result_id)

const excludeSkipped = (results: ResultInBatch[]) =>
results.filter((r): r is BaseResultInBatch => !isResultInBatchSkippedBySelectiveRerun(r))

0 comments on commit 2f73dbb

Please sign in to comment.