Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ Version 2.2.0
URLs. :issue:`2388`
- The debugger shows enhanced error locations in tracebacks in Python
3.11. :issue:`2407`

- Extracted is_resource_modified and parse_cookie from http.py
to sansio/http.py. :issue:`2408`
- Extracted utility get_content_length, get_query_string, get_path_info
functions from wsgi.py. :pr:`2415`


Version 2.1.2
-------------
Expand Down
162 changes: 162 additions & 0 deletions src/werkzeug/sansio/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import typing as t

from .._internal import _encode_idna
from .._internal import _to_str
from ..exceptions import SecurityError
from ..urls import _URLTuple
from ..urls import uri_to_iri
from ..urls import url_join
from ..urls import url_parse
from ..urls import url_quote


Expand Down Expand Up @@ -140,3 +144,161 @@ def get_current_url(
url.append(url_quote(query_string, safe=":&%=+$!*'(),"))

return uri_to_iri("".join(url))


def get_content_length(
http_content_length: t.Union[str, None] = None,
http_transfer_encoding: t.Union[str, None] = "",
) -> t.Optional[int]:
"""Returns the content length as an integer or ``None`` if
unavailable or chunked transfer encoding is used.

:param http_content_length: The Content-Length HTTP header.
:param http_transfer_encoding: The Transfer-Encoding HTTP header.

.. versionchanged:: 2.2
Using explicit header parameters to support ASGI.

.. versionadded:: 0.9
"""
if http_transfer_encoding == "chunked":
return None

if http_content_length is not None:
try:
return max(0, int(http_content_length))
except (ValueError, TypeError):
pass
return None


def get_query_string(query_string: str = "") -> str:
"""Returns a sanitized query string.

:param query_string: The (potentially unsafe) query string.

.. versionchanged: 2.2
Using explicit string parameter to support ASGI.

.. versionadded:: 0.9
"""
qs = query_string.encode("latin1")
# QUERY_STRING really should be ascii safe but some browsers
# will send us some unicode stuff (I am looking at you IE).
# In that case we want to urllib quote it badly.
return url_quote(qs, safe=":&%=+$!*'(),")


def get_path_info(
path: str = "", charset: str = "utf-8", errors: str = "replace"
) -> str:
"""Return the decoded ``path`` unless ``charset`` is ``None``.

:param path_info: The URL path.
:param charset: The charset for the path info, or ``None`` if no
decoding should be performed.
:param errors: The decoding error handling.

.. versionchanged: 2.2
Using explicit string parameter to support ASGI.

.. versionadded:: 0.9
"""
path = path.encode("latin1")
return _to_str(path, charset, errors, allow_none_charset=True)


def extract_path_info(
baseurl: str,
path_or_url: t.Union[str, _URLTuple],
charset: str = "utf-8",
errors: str = "werkzeug.url_quote",
collapse_http_schemes: bool = True,
) -> t.Optional[str]:
"""Extracts the path info as a string from the baseurl and path.
The URLs might also be IRIs.

If the path info could not be determined, `None` is returned.

Some examples:

>>> extract_path_info('http://example.com/app', '/app/hello')
'/hello'
>>> extract_path_info('http://example.com/app',
... 'https://example.com/app/hello')
'/hello'
>>> extract_path_info('http://example.com/app',
... 'https://example.com/app/hello',
... collapse_http_schemes=False) is None
True

:param baseurl: a base URL or base IRI.
This is the root of the application.
:param path_or_url: an absolute path from the server root, a
relative path (in which case it's the path info)
or a full URL.
:param charset: the charset for byte data in URLs
:param errors: the error handling on decode
:param collapse_http_schemes: if set to `False` the algorithm does
not assume that http and https on the
same server point to the same
resource.

.. versionchanged: 2.2
Using explicit baseurl string parameter to support ASGI.

.. versionchanged:: 0.15
The ``errors`` parameter defaults to leaving invalid bytes
quoted instead of replacing them.

.. versionadded:: 0.6
"""

def _normalize_netloc(scheme: str, netloc: str) -> str:
parts = netloc.split("@", 1)[-1].split(":", 1)
port: t.Optional[str]

if len(parts) == 2:
netloc, port = parts
if (scheme == "http" and port == "80") or (
scheme == "https" and port == "443"
):
port = None
else:
netloc = parts[0]
port = None

if port is not None:
netloc += f":{port}"

return netloc

# make sure whatever we are working on is a IRI and parse it
path = uri_to_iri(path_or_url, charset, errors)
base_iri = uri_to_iri(baseurl, charset, errors)
base_scheme, base_netloc, base_path = url_parse(base_iri)[:3]
cur_scheme, cur_netloc, cur_path = url_parse(url_join(base_iri, path))[:3]

# normalize the network location
base_netloc = _normalize_netloc(base_scheme, base_netloc)
cur_netloc = _normalize_netloc(cur_scheme, cur_netloc)

# is that IRI even on a known HTTP scheme?
if collapse_http_schemes:
for scheme in base_scheme, cur_scheme:
if scheme not in ("http", "https"):
return None
else:
if not (base_scheme in ("http", "https") and base_scheme == cur_scheme):
return None

# are the netlocs compatible?
if base_netloc != cur_netloc:
return None

# are we below the application path?
base_path = base_path.rstrip("/")
if not cur_path.startswith(base_path):
return None

return f"/{cur_path[len(base_path) :].lstrip('/')}"
107 changes: 35 additions & 72 deletions src/werkzeug/wsgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@
from .sansio import utils as _sansio_utils
from .sansio.utils import host_is_trusted # noqa: F401 # Imported as part of API
from .urls import _URLTuple
from .urls import uri_to_iri
from .urls import url_join
from .urls import url_parse
from .urls import url_quote

if t.TYPE_CHECKING:
from _typeshed.wsgi import WSGIApplication
Expand Down Expand Up @@ -122,20 +118,17 @@ def get_content_length(environ: "WSGIEnvironment") -> t.Optional[int]:
integer. If it's not available or chunked transfer encoding is used,
``None`` is returned.

.. versionchanged:: 2.2
Extracted this to sansio/util.py

.. versionadded:: 0.9

:param environ: the WSGI environ to fetch the content length from.
"""
if environ.get("HTTP_TRANSFER_ENCODING", "") == "chunked":
return None

content_length = environ.get("CONTENT_LENGTH")
if content_length is not None:
try:
return max(0, int(content_length))
except (ValueError, TypeError):
pass
return None
return _sansio_utils.get_content_length(
http_content_length=environ.get("CONTENT_LENGTH"),
http_transfer_encoding=environ.get("HTTP_TRANSFER_ENCODING", ""),
)


def get_input_stream(
Expand Down Expand Up @@ -183,13 +176,12 @@ def get_query_string(environ: "WSGIEnvironment") -> str:

:param environ: WSGI environment to get the query string from.

.. versionchanged:: 2.2
Extracted this to sansio/util.py

.. versionadded:: 0.9
"""
qs = environ.get("QUERY_STRING", "").encode("latin1")
# QUERY_STRING really should be ascii safe but some browsers
# will send us some unicode stuff (I am looking at you IE).
# In that case we want to urllib quote it badly.
return url_quote(qs, safe=":&%=+$!*'(),")
return _sansio_utils.get_query_string(query_string=environ.get("QUERY_STRING", ""))


def get_path_info(
Expand All @@ -203,10 +195,14 @@ def get_path_info(
decoding should be performed.
:param errors: The decoding error handling.

.. versionchanged:: 2.2
Extracted this to sansio/util.py

.. versionadded:: 0.9
"""
path = environ.get("PATH_INFO", "").encode("latin1")
return _to_str(path, charset, errors, allow_none_charset=True) # type: ignore
return _sansio_utils.get_path_info(
path=environ.get("PATH_INFO", ""), charset=charset, errors=errors
)


def get_script_name(
Expand All @@ -220,10 +216,14 @@ def get_script_name(
should be performed.
:param errors: The decoding error handling.

.. versionchanged:: 2.2
Extracted this to sansio/util.py

.. versionadded:: 0.9
"""
path = environ.get("SCRIPT_NAME", "").encode("latin1")
return _to_str(path, charset, errors, allow_none_charset=True) # type: ignore
return _sansio_utils.get_path_info(
path=environ.get("SCRIPT_NAME", ""), charset=charset, errors=errors
)


def pop_path_info(
Expand Down Expand Up @@ -354,63 +354,26 @@ def extract_path_info(
same server point to the same
resource.

.. versionchanged:: 2.2
Extracted this to sansio/util.py

.. versionchanged:: 0.15
The ``errors`` parameter defaults to leaving invalid bytes
quoted instead of replacing them.

.. versionadded:: 0.6
"""

def _normalize_netloc(scheme: str, netloc: str) -> str:
parts = netloc.split("@", 1)[-1].split(":", 1)
port: t.Optional[str]

if len(parts) == 2:
netloc, port = parts
if (scheme == "http" and port == "80") or (
scheme == "https" and port == "443"
):
port = None
else:
netloc = parts[0]
port = None

if port is not None:
netloc += f":{port}"

return netloc

# make sure whatever we are working on is a IRI and parse it
path = uri_to_iri(path_or_url, charset, errors)
if isinstance(environ_or_baseurl, dict):
environ_or_baseurl = get_current_url(environ_or_baseurl, root_only=True)
base_iri = uri_to_iri(environ_or_baseurl, charset, errors)
base_scheme, base_netloc, base_path = url_parse(base_iri)[:3]
cur_scheme, cur_netloc, cur_path = url_parse(url_join(base_iri, path))[:3]

# normalize the network location
base_netloc = _normalize_netloc(base_scheme, base_netloc)
cur_netloc = _normalize_netloc(cur_scheme, cur_netloc)

# is that IRI even on a known HTTP scheme?
if collapse_http_schemes:
for scheme in base_scheme, cur_scheme:
if scheme not in ("http", "https"):
return None
baseurl = get_current_url(environ_or_baseurl, root_only=True)
else:
if not (base_scheme in ("http", "https") and base_scheme == cur_scheme):
return None

# are the netlocs compatible?
if base_netloc != cur_netloc:
return None

# are we below the application path?
base_path = base_path.rstrip("/")
if not cur_path.startswith(base_path):
return None

return f"/{cur_path[len(base_path) :].lstrip('/')}"
baseurl = environ_or_baseurl
return _sansio_utils.extract_path_info(
baseurl=baseurl,
path_or_url=path_or_url,
charset=charset,
errors=errors,
collapse_http_schemes=collapse_http_schemes,
)


class ClosingIterator:
Expand Down