-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
_server.py
139 lines (120 loc) 路 5.23 KB
/
_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# Copyright 2016-2021, Pulumi Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import logging
import sys
import traceback
from contextlib import suppress
import grpc
from ._workspace import PulumiFn
from .. import log
from ..runtime.proto import language_pb2, plugin_pb2, LanguageRuntimeServicer
from ..runtime import run_in_stack, reset_options, set_all_config
from ..runtime.rpc_manager import RPC_MANAGER
from ..errors import RunError
_py_version_less_than_3_7 = sys.version_info[0] == 3 and sys.version_info[1] < 7
class LanguageServer(LanguageRuntimeServicer):
program: PulumiFn
def __init__(self, program: PulumiFn) -> None:
self.program = program # type: ignore
@staticmethod
def on_pulumi_exit():
# Reset globals
reset_options()
def GetRequiredPlugins(self, request, context):
return language_pb2.GetRequiredPluginsResponse()
def _exception_handler(self, loop, context):
# Exception are normally handler deeper in the stack. If this class of
# exception bubble up to here, something is wrong and we should stop
# the event loop
if "exception" in context and isinstance(context["exception"], grpc.RpcError):
loop.stop()
else:
loop.default_exception_handler(context)
def Run(self, request, context):
_suppress_unobserved_task_logging()
# Configure the runtime so that the user program hooks up to Pulumi as appropriate.
engine_address = request.args[0] if request.args else ""
organization = request.organization if request.organization else "organization"
reset_options(
project=request.project,
monitor_address=request.monitor_address,
engine_address=engine_address,
stack=request.stack,
parallel=request.parallel,
preview=request.dryRun,
organization=organization,
)
if request.config:
secret_keys = request.configSecretKeys if request.configSecretKeys else None
set_all_config(request.config, secret_keys)
# The strategy here is derived from sdk/python/cmd/pulumi-language-python-exec
result = language_pb2.RunResponse()
loop = asyncio.new_event_loop()
loop.set_exception_handler(self._exception_handler)
try:
loop.run_until_complete(run_in_stack(self.program))
except RunError as exn:
msg = str(exn)
log.error(msg)
result.error = str(msg)
return result
except grpc.RpcError as exn:
# If the monitor is unavailable, it is in the process of shutting down or has already
# shut down. Don't emit an error if this is the case.
# pylint: disable=no-member
if exn.code() == grpc.StatusCode.UNAVAILABLE:
log.debug("Resource monitor has terminated, shutting down.")
else:
msg = f"RPC error: {exn.details()}"
log.error(msg)
result.error = msg
return result
except Exception as exn:
msg = str(
f"python inline source runtime error: {exn}\n{traceback.format_exc()}"
)
log.error(msg)
result.error = msg
return result
finally:
# If there's an exception during `run_in_stack`, it may result in pending asyncio tasks remaining unresolved
# at the time the loop is closed, which results in a `Task was destroyed but it is pending!` error being
# logged to stdout. To avoid this, we collect all the unresolved tasks in the loop and cancel them before
# closing the loop.
pending = (
# lint safety: we use the python version here to track deprecations
# pylint: disable=no-member
asyncio.Task.all_tasks(loop)
if _py_version_less_than_3_7
else asyncio.all_tasks(loop)
) # pylint: disable=no-member
log.debug(f"Cancelling {len(pending)} tasks.")
for task in pending:
task.cancel()
with suppress(asyncio.CancelledError):
loop.run_until_complete(task)
loop.close()
sys.stdout.flush()
sys.stderr.flush()
RPC_MANAGER.clear()
return result
def GetPluginInfo(self, request, context):
return plugin_pb2.PluginInfo()
def _suppress_unobserved_task_logging():
"""Suppresses logs about faulted unobserved tasks. This is similar to
Python Pulumi user programs. See rationale in
`sdk/python/cmd/pulumi-language-python-exec`.
"""
logging.getLogger("asyncio").setLevel(logging.CRITICAL)