-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
pulumi-language-python-exec
executable file
路205 lines (182 loc) 路 9.91 KB
/
pulumi-language-python-exec
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
#!/usr/bin/env python
# Copyright 2016-2018, Pulumi Corporation. All rights reserved.
import argparse
import asyncio
from typing import Optional
import logging
import os
import sys
import traceback
import runpy
from concurrent.futures import ThreadPoolExecutor
# The user might not have installed Pulumi yet in their environment - provide a high-quality error message in that case.
try:
import pulumi
import pulumi.runtime
except ImportError:
# For whatever reason, sys.stderr.write is not picked up by the engine as a message, but 'print' is. The Python
# langhost automatically flushes stdout and stderr on shutdown, so we don't need to do it here - just trust that
# Python does the sane thing when printing to stderr.
print(traceback.format_exc(), file=sys.stderr)
print("""
It looks like the Pulumi SDK has not been installed. Have you run pip install?
If you are running in a virtualenv, you must run pip install -r requirements.txt from inside the virtualenv.""", file=sys.stderr)
sys.exit(1)
# use exit code 32 to signal to the language host that an error message was displayed to the user
PYTHON_PROCESS_EXITED_AFTER_SHOWING_USER_ACTIONABLE_MESSAGE_CODE = 32
def get_abs_module_path(mod_path):
path, ext = os.path.splitext(mod_path)
if not ext:
path = os.path.join(path, '__main__')
return os.path.abspath(path)
def _get_user_stacktrace(user_program_abspath: str) -> str:
'''grabs the current stacktrace and truncates it to show the only stacks pertaining to a user's program'''
tb = traceback.extract_tb(sys.exc_info()[2])
for frame_index, frame in enumerate(tb):
# loop over stack frames until we reach the main program
# then return the traceback truncated to the user's code
cur_module = frame[0]
if get_abs_module_path(user_program_abspath) == get_abs_module_path(cur_module):
# we have detected the start of a user's stack trace
remaining_frames = len(tb)-frame_index
# include remaining frames from the bottom by negating
return traceback.format_exc(limit=-remaining_frames)
# we did not detect a __main__ program, return normal traceback
return traceback.format_exc()
def _set_default_executor(loop, parallelism: Optional[int]):
'''configure this event loop to respect the settings provided.'''
if parallelism is None:
return
parallelism = max(parallelism, 1)
exec = ThreadPoolExecutor(max_workers=parallelism)
loop.set_default_executor(exec)
if __name__ == "__main__":
# Parse the arguments, program name, and optional arguments.
ap = argparse.ArgumentParser(description='Execute a Pulumi Python program')
ap.add_argument('--project', help='Set the project name')
ap.add_argument('--stack', help='Set the stack name')
ap.add_argument('--parallel', help='Run P resource operations in parallel (default=none)')
ap.add_argument('--dry_run', help='Simulate resource changes, but without making them')
ap.add_argument('--pwd', help='Change the working directory before running the program')
ap.add_argument('--monitor', help='An RPC address for the resource monitor to connect to')
ap.add_argument('--engine', help='An RPC address for the engine to connect to')
ap.add_argument('--tracing', help='A Zipkin-compatible endpoint to send tracing data to')
ap.add_argument('--organization', help='Set the organization name')
ap.add_argument('PROGRAM', help='The Python program to run')
ap.add_argument('ARGS', help='Arguments to pass to the program', nargs='*')
args = ap.parse_args()
# If any config variables are present, parse and set them, so subsequent accesses are fast.
config_env = pulumi.runtime.get_config_env()
if hasattr(pulumi.runtime, "get_config_secret_keys_env") and hasattr(pulumi.runtime, "set_all_config"):
# If the pulumi SDK has `get_config_secret_keys_env` and `set_all_config`, use them
# to set the config and secret keys.
config_secret_keys_env = pulumi.runtime.get_config_secret_keys_env()
pulumi.runtime.set_all_config(config_env, config_secret_keys_env)
else:
# Otherwise, fallback to setting individual config values.
for k, v in config_env.items():
pulumi.runtime.set_config(k, v)
# Configure the runtime so that the user program hooks up to Pulumi as appropriate.
# New versions of pulumi python support setting organization, old versions do not
try:
settings = pulumi.runtime.Settings(
monitor=args.monitor,
engine=args.engine,
project=args.project,
stack=args.stack,
parallel=int(args.parallel),
dry_run=args.dry_run == "true",
organization=args.organization,
)
except TypeError:
settings = pulumi.runtime.Settings(
monitor=args.monitor,
engine=args.engine,
project=args.project,
stack=args.stack,
parallel=int(args.parallel),
dry_run=args.dry_run == "true"
)
pulumi.runtime.configure(settings)
# Finally, swap in the args, chdir if needed, and run the program as if it had been executed directly.
sys.argv = [args.PROGRAM] + args.ARGS
if args.pwd is not None:
os.chdir(args.pwd)
successful = False
# asyncio.get_running_loop was only added in python 3.7 but we still support python 3.6
# In python 3.10, asyncio.get_event_loop prints a deprecation warning if no loop is present
# This code will be cleaned up as part of https://github.com/pulumi/pulumi/issues/8131
if sys.version_info[0] == 3 and sys.version_info[1] < 7:
loop = asyncio.get_event_loop()
else:
try:
# The docs for get_running_loop are somewhat misleading because they state:
# This function can only be called from a coroutine or a callback. However, if the function is
# called from outside a coroutine or callback (the standard case when running `pulumi up`), the function
# raises a RuntimeError as expected and falls through to the exception clause below.
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Configure the event loop to respect the parallelism value provided as input.
_set_default_executor(loop, settings.parallel)
# We are (unfortunately) suppressing the log output of asyncio to avoid showing to users some of the bad things we
# do in our programming model.
#
# Fundamentally, Pulumi is a way for users to build asynchronous dataflow graphs that, as their deployments
# progress, resolve naturally and eventually result in the complete resolution of the graph. If one node in the
# graph fails (i.e. a resource fails to create, there's an exception in an apply, etc.), part of the graph remains
# unevaluated at the time that we exit.
#
# asyncio abhors this. It gets very upset if the process terminates without having observed every future that we
# have resolved. If we are terminating abnormally, it is highly likely that we are not going to observe every single
# future that we have created. Furthermore, it's *harmless* to do this - asyncio logs errors because it thinks it
# needs to tell users that they're doing bad things (which, to their credit, they are), but we are doing this
# deliberately.
#
# In order to paper over this for our users, we simply turn off the logger for asyncio. Users won't see any asyncio
# error messages, but if they stick to the Pulumi programming model, they wouldn't be seeing any anyway.
logging.getLogger("asyncio").setLevel(logging.CRITICAL)
exit_code = 1
try:
# record the location of the user's program to return user tracebacks
user_program_abspath = os.path.abspath(args.PROGRAM)
def run():
try:
return runpy.run_path(args.PROGRAM, run_name='__main__')
except ImportError as e:
# detect if the main pulumi python program does not exist
stack_modules = [f.filename for f in traceback.extract_tb(e.__traceback__)]
unique_modules = set(module for module in stack_modules)
last_module_name = stack_modules[-1]
# we identify a missing program error if
# 1. the only modules in the stack trace are
# - `pulumi-language-python-exec`
# - `runpy`
# 2. the last function in the stack trace is in the `runpy` module
if unique_modules == {
__file__, # the language runtime itself
runpy.__file__,
} and last_module_name == runpy.__file__ :
# this error will only be hit when the user provides a directory
# the engine has a check to determine if the `main` file exists and will fail early
# if a language runtime receives a directory, it's the language's responsibility to determine
# whether the provided directory has a pulumi program
pulumi.log.error(f"unable to find main python program `__main__.py` in `{user_program_abspath}`")
sys.exit(PYTHON_PROCESS_EXITED_AFTER_SHOWING_USER_ACTIONABLE_MESSAGE_CODE)
else:
raise e
coro = pulumi.runtime.run_in_stack(run)
loop.run_until_complete(coro)
exit_code = 0
except pulumi.RunError as e:
pulumi.log.error(str(e))
except Exception:
error_msg = "Program failed with an unhandled exception:\n" + _get_user_stacktrace(user_program_abspath)
pulumi.log.error(error_msg)
exit_code = PYTHON_PROCESS_EXITED_AFTER_SHOWING_USER_ACTIONABLE_MESSAGE_CODE
finally:
loop.close()
sys.stdout.flush()
sys.stderr.flush()
sys.exit(exit_code)