Skip to content

Commit

Permalink
Merge pull request #1104 from ascheel/main
Browse files Browse the repository at this point in the history
Sanitize URLs for logging/display purposes.
  • Loading branch information
sigmavirus24 committed May 15, 2024
2 parents fb9fe50 + c512bbf commit 75de094
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 8 deletions.
1 change: 1 addition & 0 deletions changelog/1104.misc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
This change will remove sensitive output in URLs for private repositories require a login provided in the format of http(s)://<user>:<pass>@domain.com, replacing the sensitive text with a sanitized "*****:*****" to prevent these details from showing up in log files.
25 changes: 25 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,31 @@ def test_get_repository_config_missing(config_file):
assert utils.get_repository_from_config(config_file, "pypi") == exp


def test_get_repository_config_url_with_auth(config_file):
repository_url = "https://user:pass@notexisting.python.org/pypi"
exp = {
"repository": "https://notexisting.python.org/pypi",
"username": "user",
"password": "pass",
}
assert utils.get_repository_from_config(config_file, "foo", repository_url) == exp
assert utils.get_repository_from_config(config_file, "pypi", repository_url) == exp


@pytest.mark.parametrize(
"input_url, expected_url",
[
("https://upload.pypi.org/legacy/", "https://upload.pypi.org/legacy/"),
(
"https://user:pass@upload.pypi.org/legacy/",
"https://********@upload.pypi.org/legacy/",
),
],
)
def test_sanitize_url(input_url: str, expected_url: str) -> None:
assert utils.sanitize_url(input_url) == expected_url


@pytest.mark.parametrize(
"repo_url, message",
[
Expand Down
6 changes: 3 additions & 3 deletions twine/commands/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def upload(upload_settings: settings.Settings, dists: List[str]) -> None:
# Determine if the user has passed in pre-signed distributions or any attestations.
uploads, signatures, attestations_by_dist = _split_inputs(dists)

print(f"Uploading distributions to {repository_url}")
print(f"Uploading distributions to {utils.sanitize_url(repository_url)}")

packages_to_upload = [
_make_package(
Expand Down Expand Up @@ -250,8 +250,8 @@ def upload(upload_settings: settings.Settings, dists: List[str]) -> None:
# redirects as well.
if resp.is_redirect:
raise exceptions.RedirectDetected.from_args(
repository_url,
resp.headers["location"],
utils.sanitize_url(repository_url),
utils.sanitize_url(resp.headers["location"]),
)

if skip_upload(resp, upload_settings.skip_existing, package):
Expand Down
35 changes: 30 additions & 5 deletions twine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,24 @@ def get_config(path: str) -> Dict[str, RepositoryConfig]:
return dict(config)


def sanitize_url(url: str) -> str:
"""Sanitize a URL.
Sanitize URLs, removing any user:password combinations and replacing them with
asterisks. Returns the original URL if the string is a non-matching pattern.
:param url:
str containing a URL to sanitize.
return:
str either sanitized or as entered depending on pattern match.
"""
uri = rfc3986.urlparse(url)
if uri.userinfo:
return cast(str, uri.copy_with(userinfo="*" * 8).unsplit())
return url


def _validate_repository_url(repository_url: str) -> None:
"""Validate the given url for allowed schemes and components."""
# Allowed schemes are http and https, based on whether the repository
Expand All @@ -126,11 +144,7 @@ def get_repository_from_config(
# Prefer CLI `repository_url` over `repository` or .pypirc
if repository_url:
_validate_repository_url(repository_url)
return {
"repository": repository_url,
"username": None,
"password": None,
}
return _config_from_repository_url(repository_url)

try:
config = get_config(config_file)[repository]
Expand All @@ -154,6 +168,17 @@ def get_repository_from_config(
}


def _config_from_repository_url(url: str) -> RepositoryConfig:
parsed = urlparse(url)
config = {"repository": url, "username": None, "password": None}
if parsed.username:
config["username"] = parsed.username
config["password"] = parsed.password
config["repository"] = urlunparse((parsed.scheme, parsed.hostname) + parsed[2:])
config["repository"] = normalize_repository_url(cast(str, config["repository"]))
return config


def normalize_repository_url(url: str) -> str:
parsed = urlparse(url)
if parsed.netloc in _HOSTNAMES:
Expand Down

0 comments on commit 75de094

Please sign in to comment.