Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement PEP 708 #12813

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions news/11784.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Introduce repository alternate locations and project tracking, as per PEP 708.
133 changes: 132 additions & 1 deletion src/pip/_internal/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import re
import sys
from itertools import chain, groupby, repeat
from typing import TYPE_CHECKING, Dict, Iterator, List, Literal, Optional, Union
from typing import TYPE_CHECKING, Dict, Iterator, List, Literal, Optional, Set, Union

from pip._vendor.rich.console import Console, ConsoleOptions, RenderResult
from pip._vendor.rich.markup import escape
Expand Down Expand Up @@ -775,3 +775,134 @@ def __init__(self, *, distribution: "BaseDistribution") -> None:
),
hint_stmt=None,
)


class InvalidMultipleRemoteRepositories(DiagnosticPipError):
"""Common error for issues with multiple remote repositories."""

reference = "invalid-multiple-remote-repositories"
_note_suffix = (
"See PEP 708 for the specification. "
"You can override this check, which will disable the security "
"protection it provides from dependency confusion attacks, "
"by passing --insecure-multiple-remote-repositories."
)


class InvalidTracksUrl(InvalidMultipleRemoteRepositories):
"""There was an issue with a Tracks metadata url.

Tracks urls must point to the actual URLs for that project,
point to the repositories that own the namespaces, and
point to a project with the exact same name (after normalization).
"""

reference = "invalid-tracks-url"

def __init__(
self,
*,
package: str,
remote_repositories: Set[str],
invalid_tracks: Set[str],
) -> None:
super().__init__(
kind="error",
message=Text(
f"One or more Tracks for {escape(package)} "
"were not valid. "
"The remote repositories are "
f"{'; '.join(sorted(escape(r) for r in remote_repositories))}."
"The invalid tracks are "
f"{'; '.join(sorted(escape(r) for r in invalid_tracks))}."
),
context=Text(
"Tracks urls must point to the actual URLs for a project, "
"point to the repositories that own the namespaces, and "
"point to a project with the exact same normalized name."
),
hint_stmt=None,
note_stmt=Text(
"The way to resolve this error is to contact the owners of "
"each remote repository, and ask if it makes sense to "
"configure them to merge namespaces. " + self._note_suffix
),
)


class InvalidAlternativeLocationsUrl(InvalidMultipleRemoteRepositories):
"""The list of Alternate Locations for each repository do not match.

In order for this metadata to be trusted, there MUST be agreement between
all locations where that project is found as to what the alternate locations are.
"""

reference = "invalid-alternative-locations"

def __init__(
self,
*,
package: str,
remote_repositories: Set[str],
invalid_locations: Set[str],
) -> None:
super().__init__(
kind="error",
message=Text(
f"One or more Alternate Locations for {escape(package)} "
"were different among the remote repositories. "
"The remote repositories are "
f"{'; '.join(sorted(escape(r) for r in remote_repositories))}."
"The alternate locations not agreed by all remote "
"repository are "
f"{'; '.join(sorted(escape(r) for r in invalid_locations))}."
),
context=Text(
"To be able to trust the remote repository Alternate Locations, "
"all remote repositories must agree on the list of Locations."
),
hint_stmt=None,
note_stmt=Text(
"The way to resolve this error is to contact the owners of the package "
"at each remote repository, and ask if it makes sense to "
"configure them to merge namespaces. " + self._note_suffix
),
)


class UnsafeMultipleRemoteRepositories(InvalidMultipleRemoteRepositories):
"""More than one remote repository was provided for a package,
with no indication that the remote repositories can be safely merged.

The repositories, packages, or user did not indicate that
it is safe to merge remote repositories.

Multiple remote repositories are not merged by default
to reduce the risk of dependency confusion attacks."""

reference = "unsafe-multiple-remote-repositories"

def __init__(self, *, package: str, remote_repositories: Set[str]) -> None:
super().__init__(
kind="error",
message=Text(
f"More than one remote repository was found for {escape(package)}, "
"with no indication that the remote repositories can be safely merged. "
"The repositories are "
f"{'; '.join(sorted(escape(r) for r in remote_repositories))}."
),
context=Text(
"Multiple remote repositories are not merged by default "
"to reduce the risk of dependency confusion attacks."
),
hint_stmt=Text(
"Remote repositories can be specified or discovered using "
"--index-url, --extra-index-url, and --find-links. "
"Please check the pip command to see if these are in use."
),
note_stmt=Text(
"The way to resolve this error is to contact the remote repositories "
"and package owners, and ask if it makes sense to configure them to "
"merge namespaces. " + self._note_suffix
),
)
57 changes: 52 additions & 5 deletions src/pip/_internal/index/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
Optional,
Protocol,
Sequence,
Set,
Tuple,
Union,
)
Expand All @@ -33,7 +34,12 @@
from pip._vendor.requests.exceptions import RetryError, SSLError

from pip._internal.exceptions import NetworkConnectionError
from pip._internal.models.link import Link
from pip._internal.models.link import (
HEAD_META_ALTERNATE_LOCATIONS,
HEAD_META_PREFIX,
HEAD_META_TRACKS,
Link,
)
from pip._internal.models.search_scope import SearchScope
from pip._internal.network.session import PipSession
from pip._internal.network.utils import raise_for_status
Expand Down Expand Up @@ -224,13 +230,22 @@ def wrapper_wrapper(page: "IndexContent") -> List[Link]:
def parse_links(page: "IndexContent") -> Iterable[Link]:
"""
Parse a Simple API's Index Content, and yield its anchor elements as Link objects.
Includes known metadata from the HTML header.
"""

url = page.url
content_type_l = page.content_type.lower()
if content_type_l.startswith("application/vnd.pypi.simple.v1+json"):
data = json.loads(page.content)
project_track_urls = set(data.get("meta", {}).get("tracks", []))
repo_alt_urls = set(data.get("alternate-locations", []))
repo_alt_urls.add(page.url)
for file in data.get("files", []):
link = Link.from_json(file, page.url)
link = Link.from_json(
file,
page_url=page.url,
project_track_urls=project_track_urls,
repo_alt_urls=repo_alt_urls,
)
if link is None:
continue
yield link
Expand All @@ -240,10 +255,17 @@ def parse_links(page: "IndexContent") -> Iterable[Link]:
encoding = page.encoding or "utf-8"
parser.feed(page.content.decode(encoding))

url = page.url
base_url = parser.base_url or url
for anchor in parser.anchors:
link = Link.from_element(anchor, page_url=url, base_url=base_url)
repo_alt_urls = parser.repo_alt_urls or set()
repo_alt_urls.add(page.url)
link = Link.from_element(
anchor,
page_url=url,
base_url=base_url,
project_track_urls=parser.project_track_urls,
repo_alt_urls=repo_alt_urls,
)
if link is None:
continue
yield link
Expand Down Expand Up @@ -282,6 +304,8 @@ def __init__(self, url: str) -> None:
self.url: str = url
self.base_url: Optional[str] = None
self.anchors: List[Dict[str, Optional[str]]] = []
self.project_track_urls: Set[str] = set()
self.repo_alt_urls: Set[str] = set()

def handle_starttag(self, tag: str, attrs: List[Tuple[str, Optional[str]]]) -> None:
if tag == "base" and self.base_url is None:
Expand All @@ -290,13 +314,36 @@ def handle_starttag(self, tag: str, attrs: List[Tuple[str, Optional[str]]]) -> N
self.base_url = href
elif tag == "a":
self.anchors.append(dict(attrs))
elif tag == "meta":
meta_attrs = dict(attrs)
meta_key = meta_attrs.get("name", "").strip()
meta_val = meta_attrs.get("content", "").strip()
if meta_key and meta_val:
if (
meta_key == self._meta_key_tracks
and meta_val not in self.project_track_urls
):
self.project_track_urls.add(meta_val)
elif (
meta_key == self._meta_key_alternate_locations
and meta_val not in self.repo_alt_urls
):
self.repo_alt_urls.add(meta_val)

def get_href(self, attrs: List[Tuple[str, Optional[str]]]) -> Optional[str]:
for name, value in attrs:
if name == "href":
return value
return None

@functools.cached_property
def _meta_key_tracks(self) -> str:
return f"{HEAD_META_PREFIX}:{HEAD_META_TRACKS}"

@functools.cached_property
def _meta_key_alternate_locations(self) -> str:
return f"{HEAD_META_PREFIX}:{HEAD_META_ALTERNATE_LOCATIONS}"


def _handle_get_simple_fail(
link: Link,
Expand Down
Loading
Loading