Aktualisiere README und füge ti_status_checker_api.py hinzu
This commit is contained in:
224
.venv/Lib/site-packages/pip/_internal/models/direct_url.py
Normal file
224
.venv/Lib/site-packages/pip/_internal/models/direct_url.py
Normal file
@ -0,0 +1,224 @@
|
||||
""" PEP 610 """
|
||||
|
||||
import json
|
||||
import re
|
||||
import urllib.parse
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, ClassVar, Dict, Iterable, Optional, Type, TypeVar, Union
|
||||
|
||||
__all__ = [
|
||||
"DirectUrl",
|
||||
"DirectUrlValidationError",
|
||||
"DirInfo",
|
||||
"ArchiveInfo",
|
||||
"VcsInfo",
|
||||
]
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
DIRECT_URL_METADATA_NAME = "direct_url.json"
|
||||
ENV_VAR_RE = re.compile(r"^\$\{[A-Za-z0-9-_]+\}(:\$\{[A-Za-z0-9-_]+\})?$")
|
||||
|
||||
|
||||
class DirectUrlValidationError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def _get(
|
||||
d: Dict[str, Any], expected_type: Type[T], key: str, default: Optional[T] = None
|
||||
) -> Optional[T]:
|
||||
"""Get value from dictionary and verify expected type."""
|
||||
if key not in d:
|
||||
return default
|
||||
value = d[key]
|
||||
if not isinstance(value, expected_type):
|
||||
raise DirectUrlValidationError(
|
||||
f"{value!r} has unexpected type for {key} (expected {expected_type})"
|
||||
)
|
||||
return value
|
||||
|
||||
|
||||
def _get_required(
|
||||
d: Dict[str, Any], expected_type: Type[T], key: str, default: Optional[T] = None
|
||||
) -> T:
|
||||
value = _get(d, expected_type, key, default)
|
||||
if value is None:
|
||||
raise DirectUrlValidationError(f"{key} must have a value")
|
||||
return value
|
||||
|
||||
|
||||
def _exactly_one_of(infos: Iterable[Optional["InfoType"]]) -> "InfoType":
|
||||
infos = [info for info in infos if info is not None]
|
||||
if not infos:
|
||||
raise DirectUrlValidationError(
|
||||
"missing one of archive_info, dir_info, vcs_info"
|
||||
)
|
||||
if len(infos) > 1:
|
||||
raise DirectUrlValidationError(
|
||||
"more than one of archive_info, dir_info, vcs_info"
|
||||
)
|
||||
assert infos[0] is not None
|
||||
return infos[0]
|
||||
|
||||
|
||||
def _filter_none(**kwargs: Any) -> Dict[str, Any]:
|
||||
"""Make dict excluding None values."""
|
||||
return {k: v for k, v in kwargs.items() if v is not None}
|
||||
|
||||
|
||||
@dataclass
|
||||
class VcsInfo:
|
||||
name: ClassVar = "vcs_info"
|
||||
|
||||
vcs: str
|
||||
commit_id: str
|
||||
requested_revision: Optional[str] = None
|
||||
|
||||
@classmethod
|
||||
def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["VcsInfo"]:
|
||||
if d is None:
|
||||
return None
|
||||
return cls(
|
||||
vcs=_get_required(d, str, "vcs"),
|
||||
commit_id=_get_required(d, str, "commit_id"),
|
||||
requested_revision=_get(d, str, "requested_revision"),
|
||||
)
|
||||
|
||||
def _to_dict(self) -> Dict[str, Any]:
|
||||
return _filter_none(
|
||||
vcs=self.vcs,
|
||||
requested_revision=self.requested_revision,
|
||||
commit_id=self.commit_id,
|
||||
)
|
||||
|
||||
|
||||
class ArchiveInfo:
|
||||
name = "archive_info"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hash: Optional[str] = None,
|
||||
hashes: Optional[Dict[str, str]] = None,
|
||||
) -> None:
|
||||
# set hashes before hash, since the hash setter will further populate hashes
|
||||
self.hashes = hashes
|
||||
self.hash = hash
|
||||
|
||||
@property
|
||||
def hash(self) -> Optional[str]:
|
||||
return self._hash
|
||||
|
||||
@hash.setter
|
||||
def hash(self, value: Optional[str]) -> None:
|
||||
if value is not None:
|
||||
# Auto-populate the hashes key to upgrade to the new format automatically.
|
||||
# We don't back-populate the legacy hash key from hashes.
|
||||
try:
|
||||
hash_name, hash_value = value.split("=", 1)
|
||||
except ValueError:
|
||||
raise DirectUrlValidationError(
|
||||
f"invalid archive_info.hash format: {value!r}"
|
||||
)
|
||||
if self.hashes is None:
|
||||
self.hashes = {hash_name: hash_value}
|
||||
elif hash_name not in self.hashes:
|
||||
self.hashes = self.hashes.copy()
|
||||
self.hashes[hash_name] = hash_value
|
||||
self._hash = value
|
||||
|
||||
@classmethod
|
||||
def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["ArchiveInfo"]:
|
||||
if d is None:
|
||||
return None
|
||||
return cls(hash=_get(d, str, "hash"), hashes=_get(d, dict, "hashes"))
|
||||
|
||||
def _to_dict(self) -> Dict[str, Any]:
|
||||
return _filter_none(hash=self.hash, hashes=self.hashes)
|
||||
|
||||
|
||||
@dataclass
|
||||
class DirInfo:
|
||||
name: ClassVar = "dir_info"
|
||||
|
||||
editable: bool = False
|
||||
|
||||
@classmethod
|
||||
def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["DirInfo"]:
|
||||
if d is None:
|
||||
return None
|
||||
return cls(editable=_get_required(d, bool, "editable", default=False))
|
||||
|
||||
def _to_dict(self) -> Dict[str, Any]:
|
||||
return _filter_none(editable=self.editable or None)
|
||||
|
||||
|
||||
InfoType = Union[ArchiveInfo, DirInfo, VcsInfo]
|
||||
|
||||
|
||||
@dataclass
|
||||
class DirectUrl:
|
||||
url: str
|
||||
info: InfoType
|
||||
subdirectory: Optional[str] = None
|
||||
|
||||
def _remove_auth_from_netloc(self, netloc: str) -> str:
|
||||
if "@" not in netloc:
|
||||
return netloc
|
||||
user_pass, netloc_no_user_pass = netloc.split("@", 1)
|
||||
if (
|
||||
isinstance(self.info, VcsInfo)
|
||||
and self.info.vcs == "git"
|
||||
and user_pass == "git"
|
||||
):
|
||||
return netloc
|
||||
if ENV_VAR_RE.match(user_pass):
|
||||
return netloc
|
||||
return netloc_no_user_pass
|
||||
|
||||
@property
|
||||
def redacted_url(self) -> str:
|
||||
"""url with user:password part removed unless it is formed with
|
||||
environment variables as specified in PEP 610, or it is ``git``
|
||||
in the case of a git URL.
|
||||
"""
|
||||
purl = urllib.parse.urlsplit(self.url)
|
||||
netloc = self._remove_auth_from_netloc(purl.netloc)
|
||||
surl = urllib.parse.urlunsplit(
|
||||
(purl.scheme, netloc, purl.path, purl.query, purl.fragment)
|
||||
)
|
||||
return surl
|
||||
|
||||
def validate(self) -> None:
|
||||
self.from_dict(self.to_dict())
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, d: Dict[str, Any]) -> "DirectUrl":
|
||||
return DirectUrl(
|
||||
url=_get_required(d, str, "url"),
|
||||
subdirectory=_get(d, str, "subdirectory"),
|
||||
info=_exactly_one_of(
|
||||
[
|
||||
ArchiveInfo._from_dict(_get(d, dict, "archive_info")),
|
||||
DirInfo._from_dict(_get(d, dict, "dir_info")),
|
||||
VcsInfo._from_dict(_get(d, dict, "vcs_info")),
|
||||
]
|
||||
),
|
||||
)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
res = _filter_none(
|
||||
url=self.redacted_url,
|
||||
subdirectory=self.subdirectory,
|
||||
)
|
||||
res[self.info.name] = self.info._to_dict()
|
||||
return res
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, s: str) -> "DirectUrl":
|
||||
return cls.from_dict(json.loads(s))
|
||||
|
||||
def to_json(self) -> str:
|
||||
return json.dumps(self.to_dict(), sort_keys=True)
|
||||
|
||||
def is_local_editable(self) -> bool:
|
||||
return isinstance(self.info, DirInfo) and self.info.editable
|
Reference in New Issue
Block a user