Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES/11017.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added support for reusable request bodies to enable retries, redirects, and digest authentication -- by :user:`bdraco` and :user:`GLGDLY`.

Most payloads can now be safely reused multiple times, fixing long-standing issues where POST requests with form data or file uploads would fail on redirects with errors like "Form data has been processed already" or "I/O operation on closed file". This also enables digest authentication to work with request bodies and allows retry mechanisms to resend requests without consuming the payload. Note that payloads derived from async iterables may still not be reusable in some cases.
1 change: 1 addition & 0 deletions CHANGES/5530.feature.rst
1 change: 1 addition & 0 deletions CHANGES/5577.feature.rst
1 change: 1 addition & 0 deletions CHANGES/9201.feature.rst
1 change: 1 addition & 0 deletions CONTRIBUTORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ Frederik Gladhorn
Frederik Peter Aalund
Gabriel Tremblay
Gang Ji
Gary Leung
Gary Wilson Jr.
Gennady Andreyev
Georges Dubus
Expand Down
12 changes: 12 additions & 0 deletions aiohttp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,8 @@ async def _connect_and_send_request(
redirects += 1
history.append(resp)
if max_redirects and redirects >= max_redirects:
if req._body is not None:
await req._body.close()
resp.close()
raise TooManyRedirects(
history[0].request_info, tuple(history)
Expand Down Expand Up @@ -823,13 +825,18 @@ async def _connect_and_send_request(
r_url, encoded=not self._requote_redirect_url
)
except ValueError as e:
if req._body is not None:
await req._body.close()
resp.close()
raise InvalidUrlRedirectClientError(
r_url,
"Server attempted redirecting to a location that does not look like a URL",
) from e

scheme = parsed_redirect_url.scheme
if scheme not in HTTP_AND_EMPTY_SCHEMA_SET:
if req._body is not None:
await req._body.close()
resp.close()
raise NonHttpUrlRedirectClientError(r_url)
elif not scheme:
Expand All @@ -838,6 +845,9 @@ async def _connect_and_send_request(
try:
redirect_origin = parsed_redirect_url.origin()
except ValueError as origin_val_err:
if req._body is not None:
await req._body.close()
resp.close()
raise InvalidUrlRedirectClientError(
parsed_redirect_url,
"Invalid redirect URL origin",
Expand All @@ -854,6 +864,8 @@ async def _connect_and_send_request(

break

if req._body is not None:
await req._body.close()
# check response status
if raise_for_status is None:
raise_for_status = self._raise_for_status
Expand Down
14 changes: 8 additions & 6 deletions aiohttp/client_middleware_digest_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from .client_exceptions import ClientError
from .client_middlewares import ClientHandlerType
from .client_reqrep import ClientRequest, ClientResponse
from .payload import Payload


class DigestAuthChallenge(TypedDict, total=False):
Expand Down Expand Up @@ -192,7 +193,7 @@ def __init__(
self._nonce_count = 0
self._challenge: DigestAuthChallenge = {}

def _encode(self, method: str, url: URL, body: Union[bytes, str]) -> str:
async def _encode(self, method: str, url: URL, body: Union[bytes, Payload]) -> str:
"""
Build digest authorization header for the current challenge.

Expand All @@ -207,6 +208,7 @@ def _encode(self, method: str, url: URL, body: Union[bytes, str]) -> str:
Raises:
ClientError: If the challenge is missing required parameters or
contains unsupported values

"""
challenge = self._challenge
if "realm" not in challenge:
Expand Down Expand Up @@ -272,11 +274,11 @@ def KD(s: bytes, d: bytes) -> bytes:
A1 = b":".join((self._login_bytes, realm_bytes, self._password_bytes))
A2 = f"{method.upper()}:{path}".encode()
if qop == "auth-int":
if isinstance(body, str):
entity_str = body.encode("utf-8", errors="replace")
if isinstance(body, bytes): # will always be empty bytes unless Payload
entity_bytes = body
else:
entity_str = body
entity_hash = H(entity_str)
entity_bytes = await body.as_bytes() # Get bytes from Payload
entity_hash = H(entity_bytes)
A2 = b":".join((A2, entity_hash))

HA1 = H(A1)
Expand Down Expand Up @@ -398,7 +400,7 @@ async def __call__(
for retry_count in range(2):
# Apply authorization header if we have a challenge (on second attempt)
if retry_count > 0:
request.headers[hdrs.AUTHORIZATION] = self._encode(
request.headers[hdrs.AUTHORIZATION] = await self._encode(
request.method, request.url, request.body
)

Expand Down
194 changes: 158 additions & 36 deletions aiohttp/client_reqrep.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,25 @@ def _is_expected_content_type(
return expected_content_type in response_content_type


def _warn_if_unclosed_payload(payload: payload.Payload, stacklevel: int = 2) -> None:
"""Warn if the payload is not closed.

Callers must check that the body is a Payload before calling this method.

Args:
payload: The payload to check
stacklevel: Stack level for the warning (default 2 for direct callers)
"""
if not payload.autoclose and not payload.consumed:
warnings.warn(
"The previous request body contains unclosed resources. "
"Use await request.update_body() instead of setting request.body "
"directly to properly close resources and avoid leaks.",
ResourceWarning,
stacklevel=stacklevel,
)


class ClientRequest:
GET_METHODS = {
hdrs.METH_GET,
Expand All @@ -268,7 +287,7 @@ class ClientRequest:
}

# Type of body depends on PAYLOAD_REGISTRY, which is dynamic.
body: Any = b""
_body: Union[None, payload.Payload] = None
auth = None
response = None

Expand Down Expand Up @@ -439,6 +458,36 @@ def host(self) -> str:
def port(self) -> Optional[int]:
return self.url.port

@property
def body(self) -> Union[bytes, payload.Payload]:
"""Request body."""
# empty body is represented as bytes for backwards compatibility
return self._body or b""

@body.setter
def body(self, value: Any) -> None:
"""Set request body with warning for non-autoclose payloads.

WARNING: This setter must be called from within an event loop and is not
thread-safe. Setting body outside of an event loop may raise RuntimeError
when closing file-based payloads.

DEPRECATED: Direct assignment to body is deprecated and will be removed
in a future version. Use await update_body() instead for proper resource
management.
"""
# Close existing payload if present
if self._body is not None:
# Warn if the payload needs manual closing
# stacklevel=3: user code -> body setter -> _warn_if_unclosed_payload
_warn_if_unclosed_payload(self._body, stacklevel=3)
# NOTE: In the future, when we remove sync close support,
# this setter will need to be removed and only the async
# update_body() method will be available. For now, we call
# _close() for backwards compatibility.
self._body._close()
self._update_body(value)

@property
def request_info(self) -> RequestInfo:
headers: CIMultiDictProxy[str] = CIMultiDictProxy(self.headers)
Expand Down Expand Up @@ -590,9 +639,12 @@ def update_transfer_encoding(self) -> None:
)

self.headers[hdrs.TRANSFER_ENCODING] = "chunked"
else:
if hdrs.CONTENT_LENGTH not in self.headers:
self.headers[hdrs.CONTENT_LENGTH] = str(len(self.body))
elif (
self._body is not None
and hdrs.CONTENT_LENGTH not in self.headers
and (size := self._body.size) is not None
):
self.headers[hdrs.CONTENT_LENGTH] = str(size)

def update_auth(self, auth: Optional[BasicAuth], trust_env: bool = False) -> None:
"""Set basic auth."""
Expand All @@ -610,37 +662,120 @@ def update_auth(self, auth: Optional[BasicAuth], trust_env: bool = False) -> Non

self.headers[hdrs.AUTHORIZATION] = auth.encode()

def update_body_from_data(self, body: Any) -> None:
def update_body_from_data(self, body: Any, _stacklevel: int = 3) -> None:
"""Update request body from data."""
if self._body is not None:
_warn_if_unclosed_payload(self._body, stacklevel=_stacklevel)

if body is None:
self._body = None
return

# FormData
if isinstance(body, FormData):
body = body()
maybe_payload = body() if isinstance(body, FormData) else body

try:
body = payload.PAYLOAD_REGISTRY.get(body, disposition=None)
body_payload = payload.PAYLOAD_REGISTRY.get(maybe_payload, disposition=None)
except payload.LookupError:
body = FormData(body)()

self.body = body
body_payload = FormData(maybe_payload)() # type: ignore[arg-type]

self._body = body_payload
# enable chunked encoding if needed
if not self.chunked and hdrs.CONTENT_LENGTH not in self.headers:
if (size := body.size) is not None:
if (size := body_payload.size) is not None:
self.headers[hdrs.CONTENT_LENGTH] = str(size)
else:
self.chunked = True

# copy payload headers
assert body.headers
assert body_payload.headers
headers = self.headers
skip_headers = self._skip_auto_headers
for key, value in body.headers.items():
for key, value in body_payload.headers.items():
if key in headers or (skip_headers is not None and key in skip_headers):
continue
headers[key] = value

def _update_body(self, body: Any) -> None:
"""Update request body after its already been set."""
# Remove existing Content-Length header since body is changing
if hdrs.CONTENT_LENGTH in self.headers:
del self.headers[hdrs.CONTENT_LENGTH]

# Remove existing Transfer-Encoding header to avoid conflicts
if self.chunked and hdrs.TRANSFER_ENCODING in self.headers:
del self.headers[hdrs.TRANSFER_ENCODING]

# Now update the body using the existing method
# Called from _update_body, add 1 to stacklevel from caller
self.update_body_from_data(body, _stacklevel=4)

# Update transfer encoding headers if needed (same logic as __init__)
if body is not None or self.method not in self.GET_METHODS:
self.update_transfer_encoding()

async def update_body(self, body: Any) -> None:
"""
Update request body and close previous payload if needed.

This method safely updates the request body by first closing any existing
payload to prevent resource leaks, then setting the new body.

IMPORTANT: Always use this method instead of setting request.body directly.
Direct assignment to request.body will leak resources if the previous body
contains file handles, streams, or other resources that need cleanup.

Args:
body: The new body content. Can be:
- bytes/bytearray: Raw binary data
- str: Text data (will be encoded using charset from Content-Type)
- FormData: Form data that will be encoded as multipart/form-data
- Payload: A pre-configured payload object
- AsyncIterable: An async iterable of bytes chunks
- File-like object: Will be read and sent as binary data
- None: Clears the body

Usage:
# CORRECT: Use update_body
await request.update_body(b"new request data")

# WRONG: Don't set body directly
# request.body = b"new request data" # This will leak resources!

# Update with form data
form_data = FormData()
form_data.add_field('field', 'value')
await request.update_body(form_data)

# Clear body
await request.update_body(None)

Note:
This method is async because it may need to close file handles or
other resources associated with the previous payload. Always await
this method to ensure proper cleanup.

Warning:
Setting request.body directly is highly discouraged and can lead to:
- Resource leaks (unclosed file handles, streams)
- Memory leaks (unreleased buffers)
- Unexpected behavior with streaming payloads

It is not recommended to change the payload type in middleware. If the
body was already set (e.g., as bytes), it's best to keep the same type
rather than converting it (e.g., to str) as this may result in unexpected
behavior.

See Also:
- update_body_from_data: Synchronous body update without cleanup
- body property: Direct body access (STRONGLY DISCOURAGED)

"""
# Close existing payload if it exists and needs closing
if self._body is not None:
await self._body.close()
self._update_body(body)

def update_expect_continue(self, expect: bool = False) -> None:
if expect:
self.headers[hdrs.EXPECT] = "100-continue"
Expand Down Expand Up @@ -717,27 +852,14 @@ async def write_bytes(
protocol = conn.protocol
assert protocol is not None
try:
if isinstance(self.body, payload.Payload):
# Specialized handling for Payload objects that know how to write themselves
await self.body.write_with_length(writer, content_length)
else:
# Handle bytes/bytearray by converting to an iterable for consistent handling
if isinstance(self.body, (bytes, bytearray)):
self.body = (self.body,)

if content_length is None:
# Write the entire body without length constraint
for chunk in self.body:
await writer.write(chunk)
else:
# Write with length constraint, respecting content_length limit
# If the body is larger than content_length, we truncate it
remaining_bytes = content_length
for chunk in self.body:
await writer.write(chunk[:remaining_bytes])
remaining_bytes -= len(chunk)
if remaining_bytes <= 0:
break
# This should be a rare case but the
# self._body can be set to None while
# the task is being started or we wait above
# for the 100-continue response.
# The more likely case is we have an empty
# payload, but 100-continue is still expected.
if self._body is not None:
await self._body.write_with_length(writer, content_length)
except OSError as underlying_exc:
reraised_exc = underlying_exc

Expand Down Expand Up @@ -833,7 +955,7 @@ async def send(self, conn: "Connection") -> "ClientResponse":
await writer.write_headers(status_line, self.headers)

task: Optional["asyncio.Task[None]"]
if self.body or self._continue is not None or protocol.writing_paused:
if self._body or self._continue is not None or protocol.writing_paused:
coro = self.write_bytes(writer, conn, self._get_content_length())
if sys.version_info >= (3, 12):
# Optimization for Python 3.12, try to write
Expand Down
Loading
Loading