Skip to content

Commit

Permalink
Merge branch 'pulumi:master' into closure-require-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
pjoe committed Nov 29, 2022
2 parents e7c153c + 9086bf9 commit c37df22
Show file tree
Hide file tree
Showing 22 changed files with 284 additions and 114 deletions.
21 changes: 0 additions & 21 deletions .github/workflows/download-pulumi-cron.yml
Expand Up @@ -94,27 +94,6 @@ jobs:
run: |
echo "Expected version ${{ steps.vars.outputs.expected-version }} but found ${{ steps.vars.outputs.installed-version }}"
exit 1
windows-winget-install:
name: Install Pulumi with WinGet on Windows
runs-on: windows-latest
steps:
- name: Install WinGet CLI
shell: powershell
run: |
try { winget --help } catch { Add-AppxPackage -Path https://aka.ms/getwinget }
- name: Install Pulumi Using Winget
run: winget install pulumi
- name: Pulumi Version Details
id: vars
shell: bash
run: |
echo "::set-output name=installed-version::$(pulumi version)"
echo "::set-output name=expected-version::v$(curl -sS https://www.pulumi.com/latest-version)"
- name: Error if incorrect version found
if: ${{ steps.vars.outputs.expected-version != steps.vars.outputs.installed-version }}
run: |
echo "Expected version ${{ steps.vars.outputs.expected-version }} but found ${{ steps.vars.outputs.installed-version }}"
exit 1
windows-direct-install:
name: Install Pulumi via script on Windows
runs-on: windows-latest
Expand Down
@@ -0,0 +1,4 @@
changes:
- type: feat
scope: backend/service
description: Allows the service to opt into a bandwidth-optimized DIFF protocol for storing checkpoints. Previously this required setting the PULUMI_OPTIMIZED_CHECKPOINT_PATCH env variable on the client. This env variable is now deprecated.
@@ -0,0 +1,4 @@
changes:
- type: fix
scope: programgen/nodejs
description: Add `.` between `?` and `[`.
@@ -0,0 +1,4 @@
changes:
- type: fix
scope: programgen/nodejs
description: Fix capitalization when generating `fs.readdirSync`.
@@ -0,0 +1,4 @@
changes:
- type: fix
scope: programgen/go
description: Convert the result of immediate invokes to ouputs when necessary.
67 changes: 66 additions & 1 deletion pkg/backend/httpstate/backend.go
Expand Up @@ -18,6 +18,7 @@ import (
"context"
cryptorand "crypto/rand"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
Expand All @@ -29,6 +30,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"time"

opentracing "github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -119,6 +121,7 @@ type cloudBackend struct {
url string
client *client.Client
currentProject *workspace.Project
capabilities func(context.Context) capabilities
}

// Assert we implement the backend.Backend and backend.SpecificDeploymentExporter interfaces.
Expand All @@ -139,11 +142,15 @@ func New(d diag.Sink, cloudURL string) (Backend, error) {
currentProject = nil
}

client := client.NewClient(cloudURL, apiToken, d)
capabilities := detectCapabilities(d, client)

return &cloudBackend{
d: d,
url: cloudURL,
client: client.NewClient(cloudURL, apiToken, d),
client: client,
currentProject: currentProject,
capabilities: capabilities,
}, nil
}

Expand Down Expand Up @@ -767,6 +774,10 @@ func (b *cloudBackend) GetStack(ctx context.Context, stackRef backend.StackRefer
return nil, err
}

// GetStack is typically the initial call to a series of calls to the backend. Although logically unrelated,
// this is a good time to start detecting capabilities so that capability request is not on the critical path.
go b.capabilities(ctx)

stack, err := b.client.GetStack(ctx, stackID)
if err != nil {
// If this was a 404, return nil, nil as per this method's contract.
Expand Down Expand Up @@ -1753,3 +1764,57 @@ func (c httpstateBackendClient) GetStackResourceOutputs(
ctx context.Context, name string) (resource.PropertyMap, error) {
return backend.NewBackendClient(c.backend).GetStackResourceOutputs(ctx, name)
}

// Represents feature-detected capabilities of the service the backend is connected to.
type capabilities struct {
// If non-nil, indicates that delta checkpoint updates are supported.
deltaCheckpointUpdates *apitype.DeltaCheckpointUploadsConfigV1
}

// Builds a lazy wrapper around doDetectCapabilities.
func detectCapabilities(d diag.Sink, client *client.Client) func(ctx context.Context) capabilities {
var once sync.Once
var caps capabilities
done := make(chan struct{})
get := func(ctx context.Context) capabilities {
once.Do(func() {
caps = doDetectCapabilities(ctx, d, client)
close(done)
})
<-done
return caps
}
return get
}

func doDetectCapabilities(ctx context.Context, d diag.Sink, client *client.Client) capabilities {
resp, err := client.GetCapabilities(ctx)
if err != nil {
d.Warningf(diag.Message("" /*urn*/, "failed to get capabilities: %v"), err)
return capabilities{}
}
caps, err := decodeCapabilities(resp.Capabilities)
if err != nil {
d.Warningf(diag.Message("" /*urn*/, "failed to decode capabilities: %v"), err)
return capabilities{}
}
return caps
}

func decodeCapabilities(wireLevel []apitype.APICapabilityConfig) (capabilities, error) {
var parsed capabilities
for _, entry := range wireLevel {
switch entry.Capability {
case apitype.DeltaCheckpointUploads:
var cap apitype.DeltaCheckpointUploadsConfigV1
if err := json.Unmarshal(entry.Configuration, &cap); err != nil {
msg := "decoding DeltaCheckpointUploadsConfigV1 returned %w"
return capabilities{}, fmt.Errorf(msg, err)
}
parsed.deltaCheckpointUpdates = &cap
default:
continue
}
}
return parsed, nil
}
2 changes: 2 additions & 0 deletions pkg/backend/httpstate/client/api_endpoints.go
Expand Up @@ -78,6 +78,8 @@ func init() {
routes.Path(path).Methods(method).Name(name)
}

addEndpoint("GET", "/api/capabilities", "getCapabilities")

addEndpoint("GET", "/api/user", "getCurrentUser")
addEndpoint("GET", "/api/user/stacks", "listUserStacks")
addEndpoint("GET", "/api/stacks/{orgName}", "listOrganizationStacks")
Expand Down
34 changes: 33 additions & 1 deletion pkg/backend/httpstate/client/client.go
Expand Up @@ -49,6 +49,9 @@ type Client struct {
apiOrgs []string
diag diag.Sink
client restClient

// If true, do not probe the backend with GET /api/capabilities and assume no capabilities.
DisableCapabilityProbing bool
}

// newClient creates a new Pulumi API client with the given URL and API token. It is a variable instead of a regular
Expand Down Expand Up @@ -321,7 +324,7 @@ func (pc *Client) GetLatestConfiguration(ctx context.Context, stackID StackIdent
func (pc *Client) DoesProjectExist(ctx context.Context, owner string, projectName string) (bool, error) {
if err := pc.restCall(ctx, "HEAD", getProjectPath(owner, projectName), nil, nil, nil); err != nil {
// If this was a 404, return false - project not found.
if errResp, ok := err.(*apitype.ErrorResponse); ok && errResp.Code == http.StatusNotFound {
if is404(err) {
return false, nil
}

Expand Down Expand Up @@ -1070,3 +1073,32 @@ func (pc *Client) GetDeploymentUpdates(ctx context.Context, stack StackIdentifie
}
return resp, nil
}

func (pc *Client) GetCapabilities(ctx context.Context) (*apitype.CapabilitiesResponse, error) {
if pc.DisableCapabilityProbing {
return &apitype.CapabilitiesResponse{}, nil
}

var resp apitype.CapabilitiesResponse
err := pc.restCall(ctx, http.MethodGet, "/api/capabilities", nil, nil, &resp)
if is404(err) {
// The client continues to support legacy backends. They do not support /api/capabilities and are
// assumed here to have no additional capabilities.
return &apitype.CapabilitiesResponse{}, nil
}
if err != nil {
return nil, fmt.Errorf("querying capabilities failed: %w", err)
}
return &resp, nil
}

func is404(err error) bool {
if err == nil {
return false
}
var errResp *apitype.ErrorResponse
if errors.As(err, &errResp) && errResp.Code == http.StatusNotFound {
return true
}
return false
}
43 changes: 43 additions & 0 deletions pkg/backend/httpstate/client/client_test.go
Expand Up @@ -179,3 +179,46 @@ func TestPatchUpdateCheckpointVerbatimPreservesIndent(t *testing.T) {

assert.Equal(t, string(indented), string(request.UntypedDeployment))
}

func TestGetCapabilities(t *testing.T) {
t.Parallel()
t.Run("legacy-service-404", func(t *testing.T) {
t.Parallel()
s := newMockServer(404, "NOT FOUND")
defer s.Close()

c := newMockClient(s)
resp, err := c.GetCapabilities(context.Background())
assert.NoError(t, err)
assert.NotNil(t, resp)
assert.Empty(t, resp.Capabilities)
})
t.Run("updated-service-with-delta-checkpoint-capability", func(t *testing.T) {
t.Parallel()
cfg := apitype.DeltaCheckpointUploadsConfigV1{
CheckpointCutoffSizeBytes: 1024 * 1024 * 4,
}
cfgJSON, err := json.Marshal(cfg)
require.NoError(t, err)
actualResp := apitype.CapabilitiesResponse{
Capabilities: []apitype.APICapabilityConfig{{
Version: 3,
Capability: apitype.DeltaCheckpointUploads,
Configuration: json.RawMessage(cfgJSON),
}},
}
respJSON, err := json.Marshal(actualResp)
require.NoError(t, err)
s := newMockServer(200, string(respJSON))
defer s.Close()

c := newMockClient(s)
resp, err := c.GetCapabilities(context.Background())
assert.NoError(t, err)
assert.NotNil(t, resp)
assert.Len(t, resp.Capabilities, 1)
assert.Equal(t, apitype.DeltaCheckpointUploads, resp.Capabilities[0].Capability)
assert.Equal(t, `{"checkpointCutoffSizeBytes":4194304}`,
string(resp.Capabilities[0].Configuration))
})
}
29 changes: 12 additions & 17 deletions pkg/backend/httpstate/diffs.go
Expand Up @@ -33,8 +33,6 @@ import (
type deploymentDiffState struct {
lastSavedDeployment json.RawMessage
sequenceNumber int
noChecksums bool
strictMode bool
minimalDiffSize int
}

Expand All @@ -44,8 +42,11 @@ type deploymentDiff struct {
deploymentDelta json.RawMessage
}

func newDeploymentDiffState() *deploymentDiffState {
return &deploymentDiffState{sequenceNumber: 1}
func newDeploymentDiffState(minimalDiffSize int) *deploymentDiffState {
return &deploymentDiffState{
sequenceNumber: 1,
minimalDiffSize: minimalDiffSize,
}
}

func (dds *deploymentDiffState) SequenceNumber() int {
Expand All @@ -62,14 +63,10 @@ func (dds *deploymentDiffState) ShouldDiff(new *apitype.UntypedDeployment) bool
if !dds.CanDiff() {
return false
}
small := dds.minimalDiffSize
if small == 0 {
small = 1024 * 32
}
if len(dds.lastSavedDeployment) < small {
if len(dds.lastSavedDeployment) < dds.minimalDiffSize {
return false
}
if len(new.Deployment) < small {
if len(new.Deployment) < dds.minimalDiffSize {
return false
}
return true
Expand Down Expand Up @@ -99,13 +96,11 @@ func (dds *deploymentDiffState) Diff(ctx context.Context,
var checkpointHash string
checkpointHashReady := &sync.WaitGroup{}

if !dds.noChecksums {
checkpointHashReady.Add(1)
go func() {
defer checkpointHashReady.Done()
checkpointHash = dds.computeHash(childCtx, after)
}()
}
checkpointHashReady.Add(1)
go func() {
defer checkpointHashReady.Done()
checkpointHash = dds.computeHash(childCtx, after)
}()

delta, err := dds.computeEdits(childCtx, string(before), string(after))
if err != nil {
Expand Down
20 changes: 4 additions & 16 deletions pkg/backend/httpstate/snapshot.go
Expand Up @@ -18,15 +18,13 @@ import (
"context"
"encoding/json"
"fmt"
"os"

"github.com/pulumi/pulumi/pkg/v3/backend"
"github.com/pulumi/pulumi/pkg/v3/backend/httpstate/client"
"github.com/pulumi/pulumi/pkg/v3/resource/deploy"
"github.com/pulumi/pulumi/pkg/v3/resource/stack"
"github.com/pulumi/pulumi/pkg/v3/secrets"
"github.com/pulumi/pulumi/sdk/v3/go/common/apitype"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/cmdutil"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
)

Expand Down Expand Up @@ -82,9 +80,6 @@ func (persister *cloudSnapshotPersister) Save(snapshot *deploy.Snapshot) error {
return err
}
if err := persister.saveDiff(ctx, diff, token); err != nil {
if differ.strictMode {
return err
}
if logging.V(3) {
logging.V(3).Infof("ignoring error saving checkpoint "+
"with PatchUpdateCheckpointDelta, falling back to "+
Expand Down Expand Up @@ -139,17 +134,10 @@ func (cb *cloudBackend) newSnapshotPersister(ctx context.Context, update client.
sm: sm,
}

if cmdutil.IsTruthy(os.Getenv("PULUMI_OPTIMIZED_CHECKPOINT_PATCH")) {
p.deploymentDiffState = newDeploymentDiffState()

if cmdutil.IsTruthy(os.Getenv("PULUMI_OPTIMIZED_CHECKPOINT_PATCH_STRICT")) {
p.deploymentDiffState.strictMode = true
}

if cmdutil.IsTruthy(os.Getenv("PULUMI_OPTIMIZED_CHECKPOINT_PATCH_NO_CHECKSUMS")) {
p.deploymentDiffState.noChecksums = true
}
caps := cb.capabilities(ctx)
deltaCaps := caps.deltaCheckpointUpdates
if deltaCaps != nil {
p.deploymentDiffState = newDeploymentDiffState(deltaCaps.CheckpointCutoffSizeBytes)
}

return p
}

0 comments on commit c37df22

Please sign in to comment.