Skip to content

Commit

Permalink
Merge #11666
Browse files Browse the repository at this point in the history
11666: Reduce memory use for PULUMI_OPTIMIZED_CHECKPOINT_PATCH r=t0yv0 a=t0yv0

<!--- 
Thanks so much for your contribution! If this is your first time contributing, please ensure that you have read the [CONTRIBUTING](https://github.com/pulumi/pulumi/blob/master/CONTRIBUTING.md) documentation.
-->

# Description

<!--- Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. -->

Fixes #11653

Inject fewer newlines, and per `@blampe` suggestion do not inject more than 1024 newlines even if we have more than 1024 resources. This limits Myers matrix to 1024x1024 max size which should be 8MB or so.

Promotes jsoniter from indirect to direct dependency. It's easier to work with for preserving indents and doing newline injection.


## Checklist

<!--- Please provide details if the checkbox below is to be left unchecked. -->
- [ ] I have added tests that prove my fix is effective or that my feature works
<!--- 
User-facing changes require a CHANGELOG entry.
-->
- [ ] I have run `make changelog` and committed the `changelog/pending/<file>` documenting my change
<!--
If the change(s) in this PR is a modification of an existing call to the Pulumi Service,
then the service should honor older versions of the CLI where this change would not exist.
You must then bump the API version in /pkg/backend/httpstate/client/api.go, as well as add
it to the service.
-->
- [ ] Yes, there are changes in this PR that warrants bumping the Pulumi Service API version
  <!-- `@Pulumi` employees: If yes, you must submit corresponding changes in the service repo. -->


Co-authored-by: Anton Tayanovskyy <anton@pulumi.com>
  • Loading branch information
bors[bot] and t0yv0 committed Dec 15, 2022
2 parents 33cd3e3 + 4568157 commit ad74c80
Show file tree
Hide file tree
Showing 12 changed files with 206 additions and 135 deletions.
@@ -0,0 +1,4 @@
changes:
- type: fix
scope: backend/service
description: "Fixes out-of-memory issues when using PULUMI_OPTIMIZED_CHECKPOINT_PATCH protocol"
48 changes: 32 additions & 16 deletions pkg/backend/httpstate/client/client_test.go
Expand Up @@ -14,11 +14,13 @@
package client

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/pulumi/pulumi/sdk/v3/go/common/apitype"
Expand Down Expand Up @@ -135,25 +137,25 @@ func TestGzip(t *testing.T) {

}

func TestPatchUpdateCheckpointVerbatimPreservesIndent(t *testing.T) {
func TestPatchUpdateCheckpointVerbatimIndents(t *testing.T) {
t.Parallel()

deployment := apitype.DeploymentV3{
Resources: []apitype.ResourceV3{{URN: resource.URN("urn1")}},
Resources: []apitype.ResourceV3{
{URN: resource.URN("urn1")},
{URN: resource.URN("urn2")},
},
}

var indented json.RawMessage
{
indented1, err := json.MarshalIndent(deployment, "", "")
require.NoError(t, err)
untyped := apitype.UntypedDeployment{
Version: 3,
Deployment: indented1,
}
indented2, err := json.MarshalIndent(untyped, "", "")
require.NoError(t, err)
indented = indented2
}
var serializedDeployment json.RawMessage
serializedDeployment, err := json.Marshal(deployment)
assert.NoError(t, err)

untypedDeployment, err := json.Marshal(apitype.UntypedDeployment{
Version: 3,
Deployment: serializedDeployment,
})
assert.NoError(t, err)

var request apitype.PatchUpdateVerbatimCheckpointRequest

Expand All @@ -173,11 +175,25 @@ func TestPatchUpdateCheckpointVerbatimPreservesIndent(t *testing.T) {

sequenceNumber := 1

err := client.PatchUpdateCheckpointVerbatim(context.Background(),
indented, err := MarshalUntypedDeployment(&deployment)
assert.NoError(t, err)

err = client.PatchUpdateCheckpointVerbatim(context.Background(),
UpdateIdentifier{}, sequenceNumber, indented, "token")
assert.NoError(t, err)

assert.Equal(t, string(indented), string(request.UntypedDeployment))
compacted := func(raw json.RawMessage) string {
var buf bytes.Buffer
err := json.Compact(&buf, []byte(raw))
assert.NoError(t, err)
return buf.String()
}

// It should have more than one line as json.Marshal would produce.
assert.Equal(t, 4, len(strings.Split(string(request.UntypedDeployment), "\n")))

// Compacting should recover the same form as json.Marshal would produce.
assert.Equal(t, string(untypedDeployment), compacted(request.UntypedDeployment))
}

func TestGetCapabilities(t *testing.T) {
Expand Down
104 changes: 96 additions & 8 deletions pkg/backend/httpstate/client/marshal.go
Expand Up @@ -17,19 +17,107 @@ package client
import (
"bytes"
"encoding/json"
"io"
"math"

"github.com/json-iterator/go"

"github.com/pulumi/pulumi/sdk/v3/go/common/apitype"
)

// Unlike json.Marshal preserves possible indentation in req.Deployment.
const maxNewLines = 1024

var jsonIterConfig = jsoniter.Config{SortMapKeys: true}.Froze()

// Marshals to canonical JSON in the apitype.UntypedDeployment format.
//
// Optimized for large checkpoints.
//
// Injects newlines to allow efficient textual diffs over the JSON. Textual diffs currently use O(N^2) memory in the
// number of newlines, so the injection needs to be conservative. Currently it limits to up to maxNewLines newlines
// which would result in max 8MB memory use by the algorithm.
func MarshalUntypedDeployment(deployment *apitype.DeploymentV3) (json.RawMessage, error) {
var buf bytes.Buffer
var md = &marshalUntypedDeployment{deployment}
if err := md.Write(&buf); err != nil {
return nil, err
}
return buf.Bytes(), nil
}

func marshalVerbatimCheckpointRequest(req apitype.PatchUpdateVerbatimCheckpointRequest) (json.RawMessage, error) {
sentinel := []byte(`"*"`)
cp := req
cp.UntypedDeployment = sentinel
pattern, err := json.Marshal(cp)
// Unlike encoding/json, using jsonIter here will not reindent req.UntypedDeployment, which is what is needed
// for the Verbatim protocol.
return jsonIterConfig.Marshal(req)
}

type marshalUntypedDeployment struct {
deployment *apitype.DeploymentV3
}

func (c *marshalUntypedDeployment) Write(w io.Writer) error {
cfg := jsonIterConfig
stream := cfg.BorrowStream(w)
defer cfg.ReturnStream(stream)
err := c.writeToStream(stream)
return err
}

func (c *marshalUntypedDeployment) writeToStream(stream *jsoniter.Stream) error {
stream.WriteObjectStart() // writes `{`
stream.WriteObjectField("version") // writes `"version":`
stream.WriteInt(3)
stream.WriteMore() // writes `,`
stream.WriteObjectField("deployment")
err := c.writeDeploymentV3(stream)
if err != nil {
return nil, err
return err
}
stream.WriteObjectEnd() // writes `}`
return stream.Flush()
}

func (c *marshalUntypedDeployment) writeDeploymentV3(stream *jsoniter.Stream) (err error) {
deployment := c.deployment
stream.WriteObjectStart()
stream.WriteObjectField("manifest")
stream.WriteVal(deployment.Manifest)
if deployment.SecretsProviders != nil {
stream.WriteMore()
stream.WriteObjectField("secrets_providers")
stream.WriteVal(deployment.SecretsProviders)
}
if err = stream.Flush(); err != nil {
return err
}
nResources := len(deployment.Resources)

maxNL := maxNewLines - 2
newlinePeriod := int(math.Ceil(float64(nResources) / float64(maxNL)))

if nResources > 0 {
stream.WriteMore()
stream.WriteObjectField("resources")
stream.WriteRaw("[\n")
for i, r := range deployment.Resources {
if i > 0 {
stream.WriteRaw(",")
if (nResources <= maxNL) || (i%newlinePeriod == 0) {
stream.WriteRaw("\n")
}
}
stream.WriteVal(r)
if err = stream.Flush(); err != nil {
return err
}
}
stream.WriteRaw("\n]")
}
if len(deployment.PendingOperations) > 0 {
stream.WriteMore()
stream.WriteObjectField("pendingOperations")
stream.WriteVal(deployment.PendingOperations)
}
f := bytes.ReplaceAll(pattern, sentinel, req.UntypedDeployment)
return f, nil
stream.WriteObjectEnd()
return stream.Flush()
}
27 changes: 6 additions & 21 deletions pkg/backend/httpstate/diffs.go
Expand Up @@ -25,8 +25,6 @@ import (
"github.com/hexops/gotextdiff/myers"
"github.com/hexops/gotextdiff/span"

"github.com/pulumi/pulumi/sdk/v3/go/common/apitype"

opentracing "github.com/opentracing/opentracing-go"
)

Expand Down Expand Up @@ -59,39 +57,30 @@ func (dds *deploymentDiffState) CanDiff() bool {

// Size-based heuristics trying to estimate if the diff method will be
// worth it and take less time than sending the entire deployment.
func (dds *deploymentDiffState) ShouldDiff(new *apitype.UntypedDeployment) bool {
func (dds *deploymentDiffState) ShouldDiff(new json.RawMessage) bool {
if !dds.CanDiff() {
return false
}
if len(dds.lastSavedDeployment) < dds.minimalDiffSize {
return false
}
if len(new.Deployment) < dds.minimalDiffSize {
if len(new) < dds.minimalDiffSize {
return false
}
return true
}

func (dds *deploymentDiffState) Diff(ctx context.Context,
deployment *apitype.UntypedDeployment) (deploymentDiff, error) {
func (dds *deploymentDiffState) Diff(ctx context.Context, deployment json.RawMessage) (deploymentDiff, error) {

if !dds.CanDiff() {
return deploymentDiff{}, fmt.Errorf("Diff() cannot be called before Saved()")
}

if deployment.Version == 0 {
return deploymentDiff{}, fmt.Errorf("deployment.Version should be set")
}

tracingSpan, childCtx := opentracing.StartSpanFromContext(ctx, "Diff")
defer tracingSpan.Finish()

before := dds.lastSavedDeployment

after, err := marshalUntypedDeployment(deployment)
if err != nil {
return deploymentDiff{}, fmt.Errorf("marshalUntypedDeployment failed: %v", err)
}
after := deployment

var checkpointHash string
checkpointHashReady := &sync.WaitGroup{}
Expand Down Expand Up @@ -125,12 +114,8 @@ func (dds *deploymentDiffState) Diff(ctx context.Context,
}

// Indicates that a deployment was just saved to the service.
func (dds *deploymentDiffState) Saved(ctx context.Context, deployment *apitype.UntypedDeployment) error {
d, err := marshalUntypedDeployment(deployment)
if err != nil {
return err
}
dds.lastSavedDeployment = d
func (dds *deploymentDiffState) Saved(ctx context.Context, deployment json.RawMessage) error {
dds.lastSavedDeployment = deployment
dds.sequenceNumber++

return nil
Expand Down
35 changes: 0 additions & 35 deletions pkg/backend/httpstate/marshal.go

This file was deleted.

20 changes: 3 additions & 17 deletions pkg/backend/httpstate/snapshot.go
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pulumi/pulumi/pkg/v3/resource/deploy"
"github.com/pulumi/pulumi/pkg/v3/resource/stack"
"github.com/pulumi/pulumi/pkg/v3/secrets"
"github.com/pulumi/pulumi/sdk/v3/go/common/apitype"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
)

Expand Down Expand Up @@ -62,7 +61,7 @@ func (persister *cloudSnapshotPersister) Save(snapshot *deploy.Snapshot) error {
persister.context, persister.update, deploymentV3, token)
}

deployment, err := persister.marshalDeployment(deploymentV3)
deployment, err := client.MarshalUntypedDeployment(deploymentV3)
if err != nil {
return err
}
Expand Down Expand Up @@ -102,23 +101,10 @@ func (persister *cloudSnapshotPersister) saveDiff(ctx context.Context,
}

func (persister *cloudSnapshotPersister) saveFullVerbatim(ctx context.Context,
differ *deploymentDiffState, deployment *apitype.UntypedDeployment, token string) error {
untypedDeploymentBytes, err := marshalUntypedDeployment(deployment)
if err != nil {
return err
}
differ *deploymentDiffState, deployment json.RawMessage, token string) error {
return persister.backend.client.PatchUpdateCheckpointVerbatim(
persister.context, persister.update, differ.SequenceNumber(),
untypedDeploymentBytes, token)
}

func (persister *cloudSnapshotPersister) marshalDeployment(
deployment *apitype.DeploymentV3) (*apitype.UntypedDeployment, error) {
raw, err := json.MarshalIndent(deployment, "", "")
if err != nil {
return nil, fmt.Errorf("serializing deployment to json: %w", err)
}
return &apitype.UntypedDeployment{Deployment: raw, Version: 3}, nil
deployment, token)
}

var _ backend.SnapshotPersister = (*cloudSnapshotPersister)(nil)
Expand Down

0 comments on commit ad74c80

Please sign in to comment.