/
cli.py
420 lines (327 loc) · 15.5 KB
/
cli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
"""Console script for phylum-init."""
import argparse
import os
import pathlib
import platform
import shutil
import subprocess
import sys
import tempfile
import zipfile
from functools import lru_cache
from pathlib import Path
from typing import List, Optional, Tuple
import requests
from packaging.utils import canonicalize_version
from packaging.version import InvalidVersion, Version
from phylum import __version__
from phylum.constants import (
MIN_SUPPORTED_CLI_VERSION,
REQ_TIMEOUT,
SUPPORTED_ARCHES,
SUPPORTED_PLATFORMS,
TOKEN_ENVVAR_NAME,
)
from phylum.init import SCRIPT_NAME
from phylum.init.sig import verify_minisig
from ruamel.yaml import YAML
def use_legacy_paths(version):
"""Predicate to specify whether legacy paths should be used for a given version.
The Phylum config and binary paths changed following the v2.2.0 release, to adhere to the XDG Base Directory Spec.
Reference: https://specifications.freedesktop.org/basedir-spec/basedir-spec-latest.html
"""
return Version(canonicalize_version(version)) <= Version("v2.2.0")
def get_phylum_settings_path(version):
"""Get the Phylum settings path based on a provided version."""
home_dir = pathlib.Path.home()
version = version_check(version)
config_home_path = os.getenv("XDG_CONFIG_HOME")
if not config_home_path:
config_home_path = home_dir / ".config"
phylum_config_path = pathlib.Path(config_home_path) / "phylum" / "settings.yaml"
if use_legacy_paths(version):
phylum_config_path = home_dir / ".phylum" / "settings.yaml"
return phylum_config_path
def get_expected_phylum_bin_path(version):
"""Get the expected path to the Phylum CLI binary based on a provided version."""
home_dir = pathlib.Path.home()
version = version_check(version)
phylum_bin_path = home_dir / ".local" / "bin" / "phylum"
if use_legacy_paths(version):
phylum_bin_path = home_dir / ".phylum" / "phylum"
return phylum_bin_path
def get_phylum_cli_version(cli_path: Path) -> str:
"""Get the version of the installed and active Phylum CLI and return it."""
cmd = f"{cli_path} --version"
version = subprocess.run(cmd.split(), check=True, capture_output=True, text=True).stdout.strip().lower()
# Starting with Python 3.9, the str.removeprefix() method was introduced to do this same thing
prefix = "phylum "
prefix_len = len(prefix)
if version.startswith(prefix):
version = version[prefix_len:]
return version
def get_phylum_bin_path(version: str = None) -> Tuple[Optional[Path], Optional[str]]:
"""Get the current path and corresponding version to the Phylum CLI binary and return them.
Provide a CLI version as a fallback method for looking on an explicit path,
based on the expected path for that version.
"""
# Look for `phylum` on the PATH first
which_cli_path = shutil.which("phylum")
if which_cli_path is None and version is not None:
# Maybe `phylum` is installed already but not on the PATH or maybe the PATH has not been updated in this
# context. Look in the specific location expected by the provided version.
expected_cli_path = get_expected_phylum_bin_path(version)
which_cli_path = shutil.which("phylum", path=expected_cli_path)
if which_cli_path is None:
return (None, None)
cli_path = Path(which_cli_path)
cli_version = get_phylum_cli_version(cli_path)
return cli_path, cli_version
def get_latest_version():
"""Get the "latest" version programmatically and return it."""
# API Reference: https://docs.github.com/en/rest/releases/releases#get-the-latest-release
github_api_url = "https://api.github.com/repos/phylum-dev/cli/releases/latest"
headers = {"Accept": "application/vnd.github+json"}
req = requests.get(github_api_url, headers=headers, timeout=REQ_TIMEOUT)
req.raise_for_status()
req_json = req.json()
# The "name" entry stores the GitHub Release name, which could be set to something other than the version.
# Using the "tag_name" entry is better since the tags are much more tightly coupled with the release version.
latest_version = req_json.get("tag_name")
return latest_version
@lru_cache(maxsize=1)
def supported_releases() -> List[str]:
"""Get the most recent supported releases programmatically and return them."""
# API Reference: https://docs.github.com/en/rest/releases/releases#list-releases
github_api_url = "https://api.github.com/repos/phylum-dev/cli/releases"
headers = {"Accept": "application/vnd.github+json"}
query_params = {"per_page": 100}
req = requests.get(github_api_url, headers=headers, params=query_params, timeout=REQ_TIMEOUT)
req.raise_for_status()
req_json = req.json()
# The "name" entry stores the GitHub Release name, which could be set to something other than the version.
# Using the "tag_name" entry is better since the tags are much more tightly coupled with the release version.
releases = [rel.get("tag_name") for rel in req_json if is_supported_version(rel.get("tag_name", "0.0.0"))]
return releases
def is_supported_version(version: str) -> bool:
"""Predicate for determining if a given version is supported."""
try:
provided_version = Version(canonicalize_version(version))
min_supported_version = Version(MIN_SUPPORTED_CLI_VERSION)
except InvalidVersion as err:
raise ValueError("An invalid version was provided") from err
return provided_version >= min_supported_version
def supported_targets(release_tag: str) -> List[str]:
"""Get the supported Rust target triples programmatically for a given release tag and return them.
Targets are identified by their "target triple" which is the string to inform the compiler what kind of output
should be produced. A target triple consists of three strings separated by a hyphen, with a possible fourth string
at the end preceded by a hyphen. The first is the architecture, the second is the "vendor", the third is the OS
type, and the optional fourth is environment type.
References:
* https://doc.rust-lang.org/nightly/rustc/platform-support.html
* https://rust-lang.github.io/rfcs/0131-target-specification.html
"""
if release_tag not in supported_releases():
raise SystemExit(f" [!] Unsupported version: {release_tag}")
# API Reference: https://docs.github.com/en/rest/releases/releases#get-a-release-by-tag-name
github_api_url = f"https://api.github.com/repos/phylum-dev/cli/releases/tags/{release_tag}"
headers = {"Accept": "application/vnd.github+json"}
req = requests.get(github_api_url, headers=headers, timeout=REQ_TIMEOUT)
req.raise_for_status()
req_json = req.json()
assets = req_json.get("assets", [])
targets: List[str] = []
prefix, suffix = "phylum-", ".zip"
for asset in assets:
name = asset.get("name", "")
if name.startswith(prefix) and name.endswith(suffix):
target = name.replace(prefix, "").replace(suffix, "")
targets.append(target)
return list(set(targets))
def version_check(version):
"""Check a given version for validity and return a normalized form of it."""
if version == "latest":
version = get_latest_version()
version = version.lower()
if not version.startswith("v"):
version = f"v{version}"
supported_versions = supported_releases()
if version not in supported_versions:
releases = ", ".join(supported_versions)
raise argparse.ArgumentTypeError(f"version must be from a supported release: {releases}")
return version
def get_target_triple():
"""Get the "target triple" from the current system and return it."""
arch = SUPPORTED_ARCHES.get(platform.uname().machine.lower(), "unknown")
plat = SUPPORTED_PLATFORMS.get(platform.uname().system.lower(), "unknown")
return f"{arch}-{plat}"
def save_file_from_url(url, path):
"""Save a file from a given URL to a local file path, in binary mode."""
print(f" [*] Getting {url} file ...", end="")
req = requests.get(url, timeout=REQ_TIMEOUT)
req.raise_for_status()
print("Done")
print(f" [*] Saving {url} file to {path} ...", end="")
with open(path, "wb") as f:
f.write(req.content)
print("Done")
def get_archive_url(tag_name, archive_name):
"""Craft an archive download URL from a given tag name and archive name."""
# Reference: https://docs.github.com/en/rest/releases/releases#get-a-release-by-tag-name
github_base_uri = "https://github.com/phylum-dev/cli/releases"
archive_url = f"{github_base_uri}/download/{tag_name}/{archive_name}"
return archive_url
def is_token_set(phylum_settings_path, token=None):
"""Check if any token is already set in the given CLI configuration file.
Optionally, check if a specific given `token` is set.
"""
try:
settings_data = phylum_settings_path.read_text(encoding="utf-8")
except FileNotFoundError:
return False
yaml = YAML()
settings_dict = yaml.load(settings_data)
configured_token = settings_dict.get("auth_info", {}).get("offline_access")
if configured_token is None:
return False
if token is not None:
if token != configured_token:
return False
return True
def process_token_option(args):
"""Process the token option as parsed from the arguments."""
phylum_settings_path = get_phylum_settings_path(args.version)
# The token option takes precedence over the Phylum API key environment variable.
token = os.getenv(TOKEN_ENVVAR_NAME)
if args.token is not None:
token = args.token
if token:
print(f" [+] Phylum token supplied as an option or `{TOKEN_ENVVAR_NAME}` environment variable")
if is_token_set(phylum_settings_path):
print(" [+] An existing token is already set")
if is_token_set(phylum_settings_path, token=token):
print(" [+] Supplied token matches existing token")
else:
print(" [!] Supplied token will be used to overwrite the existing token")
else:
print(" [+] No existing token exists. Supplied token will be used.")
else:
print(f" [+] Phylum token NOT supplied as option or `{TOKEN_ENVVAR_NAME}` environment variable")
if is_token_set(phylum_settings_path):
print(" [+] Existing token found. It will be used without modification.")
else:
print(" [!] Existing token not found. Use `phylum auth login` or `phylum auth register` command to set it.")
if token and not is_token_set(phylum_settings_path, token=token):
setup_token(token, args)
def setup_token(token, args):
"""Setup the CLI credentials with a provided token and path to phylum binary."""
phylum_bin_path = get_expected_phylum_bin_path(args.version)
phylum_settings_path = get_phylum_settings_path(args.version)
# The phylum CLI settings.yaml file won't exist upon initial install
# but running a command will trigger the CLI to generate it
if not phylum_settings_path.exists():
cmd = f"{phylum_bin_path} version".split()
subprocess.run(cmd, check=True)
yaml = YAML()
settings_dict = yaml.load(phylum_settings_path.read_text(encoding="utf-8"))
settings_dict.setdefault("auth_info", {})
settings_dict["auth_info"]["offline_access"] = token
with open(phylum_settings_path, "w", encoding="utf-8") as f:
yaml.dump(settings_dict, f)
# Check that the token was setup correctly by using it to display the current auth status
cmd = f"{phylum_bin_path} auth status".split()
subprocess.run(cmd, check=True)
def get_args(args=None):
"""Get the arguments from the command line and return them."""
parser = argparse.ArgumentParser(
prog=SCRIPT_NAME,
description="Fetch and install the Phylum CLI",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"-V",
"--version",
action="version",
version=f"{SCRIPT_NAME} {__version__}",
)
parser.add_argument(
"-r",
"--phylum-release",
dest="version",
default="latest",
type=version_check,
help="""The version of the Phylum CLI to install. Can be specified as `latest` or a specific tagged release,
with or without the leading `v`.""",
)
parser.add_argument(
"-t",
"--target",
default=get_target_triple(),
help="The target platform type where the CLI will be installed.",
)
parser.add_argument(
"-k",
"--phylum-token",
dest="token",
help=f"""Phylum user token. Can also specify this option's value by setting the `{TOKEN_ENVVAR_NAME}`
environment variable. The value specified with this option takes precedence when both are provided.
Leave this option and it's related environment variable unspecified to either (1) use an existing token
already set in the Phylum config file or (2) to manually populate the token with a `phylum auth login` or
`phylum auth register` command after install.""",
)
list_group = parser.add_mutually_exclusive_group()
list_group.add_argument(
"--list-releases",
action="store_true",
help="List the Phylum CLI releases available to install.",
)
list_group.add_argument(
"--list-targets",
action="store_true",
help="List the target platform types available for installing a given Phylum CLI release.",
)
return parser.parse_args(args=args)
def main(args=None):
"""Main entrypoint."""
args = get_args(args=args)
if args.list_releases:
print("Looking up supported releases ...")
releases = ", ".join(supported_releases())
print(f"Supported releases: {releases}")
return 0
tag_name = args.version
supported_target_triples = supported_targets(tag_name)
if args.list_targets:
print(f"Looking up supported targets for release {tag_name} ...")
targets = ", ".join(supported_target_triples)
print(f"Supported targets for release {tag_name}: {targets}")
return 0
target_triple = args.target
if target_triple not in supported_target_triples:
raise SystemExit(f" [!] The identified target triple `{target_triple}` is not supported for release {tag_name}")
archive_name = f"phylum-{target_triple}.zip"
minisig_name = f"{archive_name}.minisig"
archive_url = get_archive_url(tag_name, archive_name)
minisig_url = f"{archive_url}.minisig"
phylum_bin_path = get_expected_phylum_bin_path(tag_name)
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir_path = pathlib.Path(temp_dir)
archive_path = temp_dir_path / archive_name
minisig_path = temp_dir_path / minisig_name
save_file_from_url(archive_url, archive_path)
save_file_from_url(minisig_url, minisig_path)
verify_minisig(archive_path, minisig_path)
with zipfile.ZipFile(archive_path, mode="r") as zip_file:
if zip_file.testzip() is not None:
raise zipfile.BadZipFile(f"There was a bad file in the zip archive {archive_name}")
extracted_dir = temp_dir_path / f"phylum-{target_triple}"
zip_file.extractall(path=temp_dir)
cmd = "sh install.sh".split()
subprocess.run(cmd, check=True, cwd=extracted_dir)
process_token_option(args)
# Check to ensure everything is working
cmd = f"{phylum_bin_path} --help".split()
subprocess.run(cmd, check=True)
return 0
if __name__ == "__main__":
sys.exit(main())