Skip to content

Commit

Permalink
[cli] Experimental support for remote operations
Browse files Browse the repository at this point in the history
  • Loading branch information
justinvp committed Oct 27, 2022
1 parent 159988d commit d25efd8
Show file tree
Hide file tree
Showing 18 changed files with 962 additions and 20 deletions.
1 change: 1 addition & 0 deletions pkg/backend/display/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,5 @@ type Options struct {
Debug bool // true to enable debug output.
Stdout io.Writer // the writer to use for stdout. Defaults to os.Stdout if unset.
Stderr io.Writer // the writer to use for stderr. Defaults to os.Stderr if unset.
SuppressTimings bool // true to suppress displaying timings of resource actions
}
9 changes: 9 additions & 0 deletions pkg/backend/display/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -1324,6 +1324,11 @@ func (display *ProgressDisplay) getStepDoneDescription(step engine.StepEventMeta
return ""
}
}

if display.opts.SuppressTimings {
return opText
}

start, ok := display.opStopwatch.start[step.URN]
if !ok {
return opText
Expand Down Expand Up @@ -1495,6 +1500,10 @@ func (display *ProgressDisplay) getStepInProgressDescription(step engine.StepEve
return ""
}

if display.opts.SuppressTimings {
return opText
}

// Calculate operation time elapsed.
start, ok := display.opStopwatch.start[step.URN]
if !ok {
Expand Down
137 changes: 136 additions & 1 deletion pkg/backend/httpstate/backend.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2016-2018, Pulumi Corporation.
// Copyright 2016-2022, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -109,6 +109,9 @@ type Backend interface {

StackConsoleURL(stackRef backend.StackReference) (string, error)
Client() *client.Client

RunDeployment(ctx context.Context, stackRef backend.StackReference, req apitype.CreateDeploymentRequest,
opts display.Options) error
}

type cloudBackend struct {
Expand Down Expand Up @@ -1594,6 +1597,138 @@ func (b *cloudBackend) UpdateStackTags(ctx context.Context,
return b.client.UpdateStackTags(ctx, stackID, tags)
}

const pulumiOperationHeader = "Pulumi operation"

func (b *cloudBackend) RunDeployment(ctx context.Context, stackRef backend.StackReference,
req apitype.CreateDeploymentRequest, opts display.Options) error {

stackID, err := b.getCloudStackIdentifier(stackRef)
if err != nil {
return err
}

resp, err := b.client.CreateDeployment(ctx, stackID, req)
if err != nil {
return err
}
id := resp.ID

fmt.Printf(opts.Color.Colorize(colors.SpecHeadline + "Preparing deployment..." + colors.Reset + "\n\n"))

if !opts.SuppressPermalink && !opts.JSONDisplay {
if resp.ConsoleURL != "" {
fmt.Printf(opts.Color.Colorize(
colors.SpecHeadline+"View Live: "+
colors.Underline+colors.BrightBlue+"%s"+colors.Reset+"\n"), resp.ConsoleURL)
}
}

token := ""
for {
logs, err := b.client.GetDeploymentLogs(ctx, stackID, id, token)
if err != nil {
return err
}

for _, l := range logs.Lines {
if l.Header != "" {
fmt.Printf(opts.Color.Colorize(
"\n" + colors.SpecHeadline + l.Header + ":" + colors.Reset + "\n"))

// If we see it's a Pulumi operation, rather than outputting the deployment logs,
// find the associated update and show the normal rendering of the operation's events.
if l.Header == pulumiOperationHeader {
fmt.Println()
return b.showDeploymentEvents(ctx, stackID, apitype.UpdateKind(req.Operation.Operation), id, opts)
}
} else {
fmt.Print(l.Line)
}
}

// If there are no more logs for the deployment and the deployment has finished or we're not following,
// then we're done.
if logs.NextToken == "" {
break
}

// Otherwise, update the token, sleep, and loop around.
if logs.NextToken == token {
time.Sleep(500 * time.Millisecond)
}
token = logs.NextToken
}

return nil
}

func (b *cloudBackend) showDeploymentEvents(ctx context.Context, stackID client.StackIdentifier,
kind apitype.UpdateKind, deploymentID string, opts display.Options) error {

getUpdateID := func() (string, error) {
for tries := 0; tries < 10; tries++ {
updates, err := b.client.GetDeploymentUpdates(ctx, stackID, deploymentID)
if err != nil {
return "", err
}
if len(updates) > 0 {
return updates[0].UpdateID, nil
}

time.Sleep(500 * time.Millisecond)
}
return "", fmt.Errorf("could not find update associated with deployment %s", deploymentID)
}

updateID, err := getUpdateID()
if err != nil {
return err
}

dryRun := kind == apitype.PreviewUpdate
update := client.UpdateIdentifier{
StackIdentifier: stackID,
UpdateKind: kind,
UpdateID: updateID,
}

events := make(chan engine.Event) // Note: unbuffered, but we assume it won't matter in practice.
done := make(chan bool)

// Timings do not display correctly when rendering remote events, so suppress showing them.
opts.SuppressTimings = true

go display.ShowEvents(
backend.ActionLabel(kind, dryRun), kind, tokens.Name(stackID.Stack), tokens.PackageName(stackID.Project),
events, done, opts, dryRun)

// The UpdateEvents API returns a continuation token to only get events after the previous call.
var continuationToken *string
for {
resp, err := b.client.GetUpdateEngineEvents(ctx, update, continuationToken)
if err != nil {
return err
}
for _, jsonEvent := range resp.Events {
event, err := display.ConvertJSONEvent(jsonEvent)
if err != nil {
return err
}
events <- event
}

continuationToken = resp.ContinuationToken
// A nil continuation token means there are no more events to read and the update has finished.
if continuationToken == nil {
close(events)
<-done
return nil
}

time.Sleep(500 * time.Millisecond)
}
}

type httpstateBackendClient struct {
backend Backend
}
Expand Down
57 changes: 57 additions & 0 deletions pkg/backend/httpstate/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,23 @@ func (pc *Client) CompleteUpdate(ctx context.Context, update UpdateIdentifier, s
updateAccessToken(token), httpCallOptions{RetryAllMethods: true})
}

// GetUpdateEngineEvents returns the engine events for an update.
func (pc *Client) GetUpdateEngineEvents(ctx context.Context, update UpdateIdentifier,
continuationToken *string) (apitype.GetUpdateEventsResponse, error) {

path := getUpdatePath(update, "events")
if continuationToken != nil {
path += fmt.Sprintf("?continuationToken=%s", *continuationToken)
}

var resp apitype.GetUpdateEventsResponse
if err := pc.restCall(ctx, "GET", path, nil, nil, &resp); err != nil {
return apitype.GetUpdateEventsResponse{}, err
}

return resp, nil
}

// RecordEngineEvents posts a batch of engine events to the Pulumi service.
func (pc *Client) RecordEngineEvents(
ctx context.Context, update UpdateIdentifier, batch apitype.EngineEventBatch, token string) error {
Expand All @@ -1013,3 +1030,43 @@ func (pc *Client) UpdateStackTags(

return pc.restCall(ctx, "PATCH", getStackPath(stack, "tags"), nil, tags, nil)
}

func getDeploymentPath(stack StackIdentifier, components ...string) string {
prefix := fmt.Sprintf("/api/preview/%s/%s/%s/deployments", stack.Owner, stack.Project, stack.Stack)
return path.Join(append([]string{prefix}, components...)...)
}

func (pc *Client) CreateDeployment(ctx context.Context, stack StackIdentifier,
req apitype.CreateDeploymentRequest) (*apitype.CreateDeploymentResponse, error) {

var resp apitype.CreateDeploymentResponse
err := pc.restCall(ctx, http.MethodPost, getDeploymentPath(stack), nil, req, &resp)
if err != nil {
return nil, fmt.Errorf("creating deployment failed: %w", err)
}
return &resp, nil
}

func (pc *Client) GetDeploymentLogs(ctx context.Context, stack StackIdentifier, id,
token string) (*apitype.DeploymentLogs, error) {

path := getDeploymentPath(stack, id, fmt.Sprintf("logs?continuationToken=%s", token))
var resp apitype.DeploymentLogs
err := pc.restCall(ctx, http.MethodGet, path, nil, nil, &resp)
if err != nil {
return nil, fmt.Errorf("getting deployment %s logs failed: %w", id, err)
}
return &resp, nil
}

func (pc *Client) GetDeploymentUpdates(ctx context.Context, stack StackIdentifier,
id string) ([]apitype.GetDeploymentUpdatesUpdateInfo, error) {

path := getDeploymentPath(stack, id, "updates")
var resp []apitype.GetDeploymentUpdatesUpdateInfo
err := pc.restCall(ctx, http.MethodGet, path, nil, nil, &resp)
if err != nil {
return nil, fmt.Errorf("getting deployment %s updates failed: %w", id, err)
}
return resp, nil
}
1 change: 1 addition & 0 deletions pkg/cmd/pulumi/crypto.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func getStackSecretsManager(s backend.Stack) (secrets.Manager, error) {
return nil, err
}

// nolint: goconst
if ps.SecretsProvider != passphrase.Type && ps.SecretsProvider != "default" && ps.SecretsProvider != "" {
return newCloudSecretsManager(s.Ref().Name(), configFile, ps.SecretsProvider)
}
Expand Down
38 changes: 36 additions & 2 deletions pkg/cmd/pulumi/destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pulumi/pulumi/pkg/v3/engine"
"github.com/pulumi/pulumi/pkg/v3/resource/deploy"
"github.com/pulumi/pulumi/pkg/v3/resource/graph"
"github.com/pulumi/pulumi/sdk/v3/go/common/apitype"
"github.com/pulumi/pulumi/sdk/v3/go/common/resource"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/cmdutil"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
Expand All @@ -43,6 +44,9 @@ func newDestroyCmd() *cobra.Command {
var execKind string
var execAgent string

// Flags for remote operations.
remoteArgs := RemoteArgs{}

// Flags for engine.UpdateOptions.
var jsonDisplay bool
var diffDisplay bool
Expand All @@ -60,8 +64,13 @@ func newDestroyCmd() *cobra.Command {
var targetDependents bool
var excludeProtected bool

use, cmdArgs := "destroy", cmdutil.NoArgs
if remoteSupported() {
use, cmdArgs = "destroy [url]", cmdutil.MaximumNArgs(1)
}

var cmd = &cobra.Command{
Use: "destroy",
Use: use,
Aliases: []string{"down"},
SuggestFor: []string{"delete", "kill", "remove", "rm", "stop"},
Short: "Destroy all existing resources in the stack",
Expand All @@ -75,9 +84,15 @@ func newDestroyCmd() *cobra.Command {
"`--remove` flag to delete the stack.\n" +
"\n" +
"Warning: this command is generally irreversible and should be used with great care.",
Args: cmdutil.NoArgs,
Args: cmdArgs,
Run: cmdutil.RunResultFunc(func(cmd *cobra.Command, args []string) result.Result {
ctx := commandContext()

// Remote implies we're skipping previews.
if remoteArgs.remote {
skipPreview = true
}

yes = yes || skipPreview || skipConfirmations()
interactive := cmdutil.Interactive()
if !interactive && !yes {
Expand Down Expand Up @@ -117,6 +132,22 @@ func newDestroyCmd() *cobra.Command {
opts.Display.SuppressPermalink = false
}

if remoteArgs.remote {
if len(args) == 0 {
return result.FromError(errors.New("must specify remote URL"))
}

err = validateUnsupportedRemoteFlags(false, nil, false, "", jsonDisplay, nil,
nil, refresh, showConfig, showReplacementSteps, showSames, false,
suppressOutputs, "default", targets, nil, nil,
targetDependents, "", stackConfigFile)
if err != nil {
return result.FromError(err)
}

return runDeployment(ctx, opts.Display, apitype.Destroy, stack, args[0], remoteArgs)
}

filestateBackend, err := isFilestateBackend(opts.Display)
if err != nil {
return result.FromError(err)
Expand Down Expand Up @@ -316,6 +347,9 @@ func newDestroyCmd() *cobra.Command {
&yes, "yes", "y", false,
"Automatically approve and perform the destroy after previewing it")

// Remote flags
remoteArgs.applyFlags(cmd)

if hasDebugCommands() {
cmd.PersistentFlags().StringVar(
&eventLogPath, "event-log", "",
Expand Down

0 comments on commit d25efd8

Please sign in to comment.