diff --git a/tests/integration/test_filter.py b/tests/integration/test_filter.py index b42bb342..3f774488 100644 --- a/tests/integration/test_filter.py +++ b/tests/integration/test_filter.py @@ -4,6 +4,7 @@ from urllib.parse import urlencode from urllib.error import HTTPError import vcr +from vcr.filters import brotli import json from assertions import assert_cassette_has_one_response, assert_is_json @@ -118,6 +119,22 @@ def test_decompress_deflate(tmpdir, httpbin): assert_is_json(decoded_response) +def test_decompress_brotli(tmpdir, httpbin): + if brotli is None: + # XXX: this is never true, because brotlipy is installed with "httpbin" + pytest.skip('Brotli is not installed') + + url = httpbin.url + "/brotli" + request = Request(url, headers={"Accept-Encoding": ["gzip, deflate, br"]}) + cass_file = str(tmpdir.join("brotli_response.yaml")) + with vcr.use_cassette(cass_file, decode_compressed_response=True): + urlopen(request) + with vcr.use_cassette(cass_file) as cass: + decoded_response = urlopen(url).read() + assert_cassette_has_one_response(cass) + assert_is_json(decoded_response) + + def test_decompress_regular(tmpdir, httpbin): """Test that it doesn't try to decompress content that isn't compressed""" url = httpbin.url + "/get" diff --git a/tox.ini b/tox.ini index 0a5519a9..3655c566 100644 --- a/tox.ini +++ b/tox.ini @@ -3,8 +3,8 @@ skip_missing_interpreters=true envlist = cov-clean, lint, - {py37,py38,py39,py310}-{requests,httplib2,urllib3,tornado4,boto3,aiohttp,httpx}, - {pypy3}-{requests,httplib2,urllib3,tornado4,boto3}, + {py37,py38,py39,py310}-{requests,httplib2,urllib3,tornado4,boto3,aiohttp,httpx,brotli,brotlipy,brotlicffi}, + {pypy3}-{requests,httplib2,urllib3,tornado4,boto3,brotli,brotlipy,brotlicffi}, {py310}-httpx019, cov-report @@ -93,6 +93,9 @@ deps = # httpx==0.19 is the latest version that supports allow_redirects, newer versions use follow_redirects httpx019: httpx==0.19 {py37,py38,py39,py310}-{httpx}: pytest-asyncio + brotli: brotli + brotlipy: brotlipy + brotlicffi: brotlicffi depends = lint,{py37,py38,py39,py310,pypy3}-{requests,httplib2,urllib3,tornado4,boto3},{py37,py38,py39,py310}-{aiohttp},{py37,py38,py39,py310}-{httpx}: cov-clean cov-report: lint,{py37,py38,py39,py310,pypy3}-{requests,httplib2,urllib3,tornado4,boto3},{py37,py38,py39,py310}-{aiohttp} diff --git a/vcr/filters.py b/vcr/filters.py index 8e00b644..1f22eb2c 100644 --- a/vcr/filters.py +++ b/vcr/filters.py @@ -6,6 +6,20 @@ from .util import CaseInsensitiveDict +try: + # This supports both brotli & brotlipy packages + import brotli +except ImportError: + try: + import brotlicffi as brotli + except ImportError: + brotli = None + + +AVAILABLE_DECOMPRESSORS = {'gzip', 'deflate'} +if brotli is not None: + AVAILABLE_DECOMPRESSORS.add('br') + def replace_headers(request, replacements): """Replace headers in request according to replacements. @@ -136,15 +150,16 @@ def remove_post_data_parameters(request, post_data_parameters_to_remove): def decode_response(response): """ - If the response is compressed with gzip or deflate: + If the response is compressed with any supported compression (gzip, + deflate, br if available): 1. decompress the response body 2. delete the content-encoding header 3. update content-length header to decompressed length """ - def is_compressed(headers): + def is_decompressable(headers): encoding = headers.get("content-encoding", []) - return encoding and encoding[0] in ("gzip", "deflate") + return encoding and encoding[0] in AVAILABLE_DECOMPRESSORS def decompress_body(body, encoding): """Returns decompressed body according to encoding using zlib. @@ -152,14 +167,16 @@ def decompress_body(body, encoding): """ if encoding == "gzip": return zlib.decompress(body, zlib.MAX_WBITS | 16) - else: # encoding == 'deflate' + elif encoding == "deflate": return zlib.decompress(body) + else: # encoding == 'br' + return brotli.decompress(body) # Deepcopy here in case `headers` contain objects that could # be mutated by a shallow copy and corrupt the real response. response = copy.deepcopy(response) headers = CaseInsensitiveDict(response["headers"]) - if is_compressed(headers): + if is_decompressable(headers): encoding = headers["content-encoding"][0] headers["content-encoding"].remove(encoding) if not headers["content-encoding"]: