Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix 'I/O operation on closed file' and 'Form data has been processed already' upon redirect on multipart data #9201

Open
wants to merge 12 commits into
base: master
Choose a base branch
from

Conversation

GLGDLY
Copy link

@GLGDLY GLGDLY commented Sep 19, 2024

What do these changes do?

Fix 'I/O operation on closed file' and 'Form data has been processed already' upon redirect on multipart data, based on the discussion in #6853 .

This approach tries to pre-build Payload object for the data passing into the request, before the redirect while True loop starts, so we can reuse the same Payload object for the entire redirect chain. However, it is notable that I/O object is always an issue, so my thought here is to use the seek operation on the I/O object to allow chunk writing on redirect requests.

Yet, for non-seekable I/O objects, some possible solution are:

  • sending expect 100 first
  • local buffering
  • raise error

I think more discussion might be needed for non-seekable objects, so I just raise error in this PR first.

Another thing I think worth discussion is that I removed the close() operation on the I/O object in this PR due to the following reasons:

  • StringIOPayload do not close its I/O value in the original code,
  • Closing of the I/O value will make the payload un-reusable, even if it is seekable
  • We are not the one to create the I/O value, so maybe we should leave for user to close it.
  • standard I/O object will be closed upon destructure, so it won't affect the user interface

But I do think more discussions might be needed here.

Are there changes in behavior for the user?

No.

Is it a substantial burden for the maintainers to support this?

Yes.

Related issue number

Fixes #5577
Fixes #5530

Checklist

  • I think the code is well written
  • Unit tests for the changes exist
  • Documentation reflects the changes
  • If you provide code modification, please add yourself to CONTRIBUTORS.txt
    • The format is <Name> <Surname>.
    • Please keep alphabetical order, the file is sorted by names.
  • Add a new news fragment into the CHANGES/ folder
    • name it <issue_or_pr_num>.<type>.rst (e.g. 588.bugfix.rst)

    • if you don't have an issue number, change it to the pull request
      number after creating the PR

      • .bugfix: A bug fix for something the maintainers deemed an
        improper undesired behavior that got corrected to match
        pre-agreed expectations.
      • .feature: A new behavior, public APIs. That sort of stuff.
      • .deprecation: A declaration of future API removals and breaking
        changes in behavior.
      • .breaking: When something public is removed in a breaking way.
        Could be deprecated in an earlier release.
      • .doc: Notable updates to the documentation structure or build
        process.
      • .packaging: Notes for downstreams about unobvious side effects
        and tooling. Changes in the test invocation considerations and
        runtime assumptions.
      • .contrib: Stuff that affects the contributor experience. e.g.
        Running tests, building the docs, setting up the development
        environment.
      • .misc: Changes that are hard to assign to any of the above
        categories.
    • Make sure to use full sentences with correct case and punctuation,
      for example:

      Fixed issue with non-ascii contents in doctest text files
      -- by :user:`contributor-gh-handle`.

      Use the past tense or the present tense a non-imperative mood,
      referring to what's changed compared to the last released version
      of this project.

Copy link

codecov bot commented Sep 19, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 98.32%. Comparing base (8911419) to head (c4da788).

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #9201      +/-   ##
==========================================
+ Coverage   98.31%   98.32%   +0.01%     
==========================================
  Files         107      107              
  Lines       34510    34765     +255     
  Branches     4100     4137      +37     
==========================================
+ Hits        33929    34184     +255     
  Misses        410      410              
  Partials      171      171              
Flag Coverage Δ
CI-GHA 98.22% <100.00%> (+0.01%) ⬆️
OS-Linux 97.89% <100.00%> (+0.01%) ⬆️
OS-Windows 96.32% <100.00%> (+0.02%) ⬆️
OS-macOS 97.57% <100.00%> (+0.01%) ⬆️
Py-3.10.11 96.12% <100.00%> (-1.53%) ⬇️
Py-3.10.15 97.60% <100.00%> (+0.01%) ⬆️
Py-3.11.10 97.49% <100.00%> (+0.01%) ⬆️
Py-3.11.9 97.56% <100.00%> (+0.01%) ⬆️
Py-3.12.6 97.94% <100.00%> (+0.01%) ⬆️
Py-3.9.13 97.55% <100.00%> (+0.01%) ⬆️
Py-3.9.20 97.49% <100.00%> (+0.01%) ⬆️
Py-pypy7.3.16 97.11% <100.00%> (+0.01%) ⬆️
VM-macos 97.57% <100.00%> (+0.01%) ⬆️
VM-ubuntu 97.89% <100.00%> (+0.01%) ⬆️
VM-windows 96.32% <100.00%> (+0.02%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

aiohttp/payload.py Outdated Show resolved Hide resolved
aiohttp/formdata.py Outdated Show resolved Hide resolved
@Dreamsorcerer
Copy link
Member

At first glance, it looks like some reasonable changes for an awkward situation.

@psf-chronographer psf-chronographer bot added the bot:chronographer:provided There is a change note present in this PR label Sep 20, 2024
@Dreamsorcerer Dreamsorcerer added backport-3.10 Trigger automatic backporting to the 3.10 release branch by Patchback robot backport-3.11 Trigger automatic backporting to the 3.11 release branch by Patchback robot labels Sep 20, 2024
aiohttp/payload.py Outdated Show resolved Hide resolved
Copy link
Member

@Dreamsorcerer Dreamsorcerer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is good, I'd like @bdraco to have a quick check as well though.

@bdraco
Copy link
Member

bdraco commented Sep 21, 2024

I'm on the way back home from Europe and will take a look this weekend if I'm not too jet lagged. Otherwise I'll take a look when the week starts

async def write(self, writer: AbstractStreamWriter) -> None:
loop = asyncio.get_event_loop()
try:
if self._seekable:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: see if executor jobs can be combined

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be combined into a single executor job. Example

diff --git a/aiohttp/payload.py b/aiohttp/payload.py
index 395c44f6e..3b222b93e 100644
--- a/aiohttp/payload.py
+++ b/aiohttp/payload.py
@@ -319,15 +319,19 @@ class IOBasePayload(Payload):
         else:
             self._stream_pos = 0
 
-    async def write(self, writer: AbstractStreamWriter) -> None:
-        loop = asyncio.get_event_loop()
+    def _read_first(self) -> None:
+        """Read the first chunk of data from the stream."""
         if self._seekable:
-            await loop.run_in_executor(None, self._value.seek, self._stream_pos)
+            self._value.seek(self._stream_pos)
         elif not self._writable:
             raise RuntimeError(
                 f'Non-seekable IO payload "{self._value}" is already consumed (possibly due to redirect, consider storing in a seekable IO buffer instead)'
             )
-        chunk = await loop.run_in_executor(None, self._value.read, 2**16)
+        return self._value.read(2**16)
+
+    async def write(self, writer: AbstractStreamWriter) -> None:
+        loop = asyncio.get_running_loop()
+        chunk = await loop.run_in_executor(None, self._read_first)
         while chunk:
             await writer.write(chunk)
             chunk = await loop.run_in_executor(None, self._value.read, 2**16)

@@ -354,40 +373,50 @@ def __init__(
@property
def size(self) -> Optional[int]:
try:
return os.fstat(self._value.fileno()).st_size - self._value.tell()
return os.fstat(self._value.fileno()).st_size - self._stream_pos
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: verify this doesn't run in the event loop

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like multipart will call this in the event loop from append_payload via ClientRequest.update_body_from_data

return self._value.read()

async def write(self, writer: AbstractStreamWriter) -> None:
loop = asyncio.get_event_loop()
try:
if self._seekable:
await loop.run_in_executor(None, self._value.seek, self._stream_pos)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: see if executor jobs can be combined

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jobs can be combined like #9201 (comment)

self._value.seek(position)
return end - position
def size(self) -> Optional[int]:
if self._seekable:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: verify this doesn't run in the event loop

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
if self._seekable:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: make sure this is run in the executor

except OSError:
return None

def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
if self._seekable:
self._value.seek(self._stream_pos)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: verify this isn't called in the event loop as it does block

except OSError:
return None

def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
if self._seekable:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to seek on the first time?

await writer.write(data)
chunk = await loop.run_in_executor(None, self._value.read, 2**16)
finally:
await loop.run_in_executor(None, self._value.close)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: verify close still happens in failure

@@ -406,6 +435,8 @@ def size(self) -> Optional[int]:
return None

def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
if self._seekable:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: verify this does not run in the event loop

@@ -406,6 +435,8 @@ def size(self) -> Optional[int]:
return None

def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
if self._seekable:
self._value.seek(self._stream_pos)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to happen on the first attempt?

self._writable = True

try:
self._seekable = self._value.seekable()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think seekable can be blocking , at least aiofiles delegates it the executor https://pypi.org/project/aiofiles/

@bdraco
Copy link
Member

bdraco commented Sep 22, 2024

It looks like there are a few places where blocking I/O is happening in the event loop that need to be addressed before we can move this forward

@GLGDLY
Copy link
Author

GLGDLY commented Sep 23, 2024

after the discussion with cpython community, I think seekable() is not appropriate in our case. I will figure out another approach, maybe just check if seeking backward is callable, only when we really need to do so (e.g. in redirect)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport-3.10 Trigger automatic backporting to the 3.10 release branch by Patchback robot backport-3.11 Trigger automatic backporting to the 3.11 release branch by Patchback robot bot:chronographer:provided There is a change note present in this PR
Projects
None yet
3 participants