Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some records failed while calling PutRecordBatch to Firehose stream, retrying. Individual error codes: ServiceUnavailableException #6

Open
sleworthy opened this issue Aug 27, 2021 · 4 comments

Comments

@sleworthy
Copy link

Hi there,
Thanks for your work on putting this script together it's helping us hugely!
We are seeing an error (where we are processing 3-4000 records of various sizes) where we are presumably trying to send too much data to the Firehose (metrics seem to show it throttles on bytes per second limit being hit) and hitting the limits and doing that 20 times therefore the function is erroring out.

We see lots of "Some records failed while calling PutRecordBatch to Firehose stream, retrying. Individual error codes: ServiceUnavailableException," in the CLoudWatch logs for the function.

Sometimes it seems to get through eventually, but sometimes it hits the 20 retries limit and errors with:

"[ERROR] RuntimeError: Could not put records after 20 attempts. Individual error codes: ServiceUnavailableException"

Looking online it seems to general fix for such issues is to implement a back-off and retry process as per: https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html and https://docs.aws.amazon.com/general/latest/gr/api-retries.html.

I was planning on implementing this into your code, but before doing so wondered if there was an easier fix you knew of?

Thanks in advance

@Dicondur
Copy link

Max payload size for lambda is 6mb.

boto3 has builtin support for retries checkout below doc:
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#guide-retries

@LiamNewtonNHS
Copy link

Hi There,
We are currently seeing this issue. What was your work round if you don't mind me asking

@Dicondur
Copy link

We were able to increase firehose shards thru service limit increase with AWS support.

Ref: https://docs.aws.amazon.com/firehose/latest/dev/limits.html

@sleworthy
Copy link
Author

Hi @LiamNewtonNHS we did implement a back off function which has seemed to solve the issue and also lowered the batch size.
Some extracts below. This was based on the original now deprecated version of the lambda though (not the new one as of 10 months ago).
If this doesn't help let me know and I'll share the whole lambda sanitised (as we added loads of debug logging statements in too to help us diagnose the issue).

def putRecordsToFirehoseStream(streamName, records, client, attemptsMade, maxAttempts):
    failedRecords = []
    codes = []
    errMessage = []
    errMsg = ''
    # if put_record_batch throws for whatever reason, response['xx'] will error out, adding a check for a valid
    # response will prevent this
    response = None
    try:
        response = client.put_record_batch(DeliveryStreamName=streamName, Records=records)
    except Exception as e:
        failedRecords = records
        errMsg = str(e)

    # if there are no failedRecords (put_record_batch succeeded), iterate over the response to gather results
    if not failedRecords and response and response['FailedPutCount'] > 0:
        for idx, res in enumerate(response['RequestResponses']):
            # (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest
            if 'ErrorCode' not in res or not res['ErrorCode']:
                continue

            codes.append(res['ErrorCode'])
            errMessage.append(res['ErrorMessage'])
            failedRecords.append(records[idx])

        errMsg = 'Individual error codes: ' + ','.join(codes)
        errMsg += 'Individual error messages: ' + ','.join(errMessage)
        
    

    if len(failedRecords) > 0:
        #If the failure is due to service timeouts then we will need to backoff and allow more time
        serviceThroughputError = 'ServiceUnavailableException'
        if serviceThroughputError in errMsg:
            print('Hitting capacity/throughput limit on Firehose Stream PutRecordBatch. Backing off and slowing down. Sleeping for the following seconds:',attemptsMade/2)
            time.sleep(attemptsMade/2)
            print('Finished sleeping')
        if attemptsMade + 1 < maxAttempts:
            print('Some records failed while calling PutRecordBatch to Firehose stream, retrying. %s' % (errMsg))
            putRecordsToFirehoseStream(streamName, failedRecords, client, attemptsMade + 1, maxAttempts)
        else:
            raise RuntimeError('Could not put records after %s attempts. %s' % (str(maxAttempts), errMsg))

We also lowered the batch sizes in the entry function as so:

 # We have lowered this to 4000000 as we believe this correlates to the 4Mb PutRecordBatch maximum size AWS states, which cannot be changed.
        if projectedSize > 4000000:
            totalRecordsToBeReingested += 1
            recordsToReingest.append(
                getReingestionRecord(isSas, dataByRecordId[rec['recordId']])
            )
            records[idx]['result'] = 'Dropped'
            del(records[idx]['data'])

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants