Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce memory use for PULUMI_OPTIMIZED_CHECKPOINT_PATCH #11666

Merged
merged 6 commits into from Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,4 @@
changes:
- type: fix
scope: backend/service
description: "Fixes out-of-memory issues when using PULUMI_OPTIMIZED_CHECKPOINT_PATCH protocol"
48 changes: 32 additions & 16 deletions pkg/backend/httpstate/client/client_test.go
Expand Up @@ -14,11 +14,13 @@
package client

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/pulumi/pulumi/sdk/v3/go/common/apitype"
Expand Down Expand Up @@ -135,25 +137,25 @@ func TestGzip(t *testing.T) {

}

func TestPatchUpdateCheckpointVerbatimPreservesIndent(t *testing.T) {
func TestPatchUpdateCheckpointVerbatimIndents(t *testing.T) {
t.Parallel()

deployment := apitype.DeploymentV3{
Resources: []apitype.ResourceV3{{URN: resource.URN("urn1")}},
Resources: []apitype.ResourceV3{
{URN: resource.URN("urn1")},
{URN: resource.URN("urn2")},
},
}

var indented json.RawMessage
{
indented1, err := json.MarshalIndent(deployment, "", "")
require.NoError(t, err)
untyped := apitype.UntypedDeployment{
Version: 3,
Deployment: indented1,
}
indented2, err := json.MarshalIndent(untyped, "", "")
require.NoError(t, err)
indented = indented2
}
var serializedDeployment json.RawMessage
serializedDeployment, err := json.Marshal(deployment)
assert.NoError(t, err)

untypedDeployment, err := json.Marshal(apitype.UntypedDeployment{
Version: 3,
Deployment: serializedDeployment,
})
assert.NoError(t, err)

var request apitype.PatchUpdateVerbatimCheckpointRequest

Expand All @@ -173,11 +175,25 @@ func TestPatchUpdateCheckpointVerbatimPreservesIndent(t *testing.T) {

sequenceNumber := 1

err := client.PatchUpdateCheckpointVerbatim(context.Background(),
indented, err := MarshalUntypedDeployment(&deployment)
assert.NoError(t, err)

err = client.PatchUpdateCheckpointVerbatim(context.Background(),
UpdateIdentifier{}, sequenceNumber, indented, "token")
assert.NoError(t, err)

assert.Equal(t, string(indented), string(request.UntypedDeployment))
compacted := func(raw json.RawMessage) string {
var buf bytes.Buffer
err := json.Compact(&buf, []byte(raw))
assert.NoError(t, err)
return buf.String()
}

// It should have more than one line as json.Marshal would produce.
assert.Equal(t, 4, len(strings.Split(string(request.UntypedDeployment), "\n")))

// Compacting should recover the same form as json.Marshal would produce.
assert.Equal(t, string(untypedDeployment), compacted(request.UntypedDeployment))
}

func TestGetCapabilities(t *testing.T) {
Expand Down
101 changes: 92 additions & 9 deletions pkg/backend/httpstate/client/marshal.go
Expand Up @@ -17,19 +17,102 @@ package client
import (
"bytes"
"encoding/json"
"io"
"math"

"github.com/json-iterator/go"

"github.com/pulumi/pulumi/sdk/v3/go/common/apitype"
)

// Unlike json.Marshal preserves possible indentation in req.Deployment.
func marshalVerbatimCheckpointRequest(req apitype.PatchUpdateVerbatimCheckpointRequest) (json.RawMessage, error) {
sentinel := []byte(`"*"`)
cp := req
cp.UntypedDeployment = sentinel
pattern, err := json.Marshal(cp)
if err != nil {
var jsonIterConfig = jsoniter.Config{SortMapKeys: true}.Froze()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could go either way on this.

My gut says unless jsonIterConfig is mutable (Froze() indicates to me it isn't), I'd throw it in a function so no one tries to mutate it.

On the other hand, Froze() will make it apparent the minute anyone tries to mutate it (as long as they test their change), and putting this in a function will create 1 unnecessary allocation each time Write is called. Probably better to leave it as is.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a strange lib. Froze() allocates state/cache/pools. This needs to be a module-level var I think for best results (reuse of the pools).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM! IMO module level variables is typically an anti-pattern, except when it isn't. Which is Golang in a nutshell... :\

This is a good example of when it isn't.


// Marshals to canonical JSON in the apitype.UntypedDeployment format.
//
// Optimized for large checkpoints.
//
// Injects newlines to allow efficient textual diffs over the JSON. Textual diffs currently use O(N^2) memory in the
// number of newlines, so the injection needs to be conservative. Currently it limits to up to 1024 newlines which would
// result in max 8MB memory use by the algorithm.
func MarshalUntypedDeployment(deployment *apitype.DeploymentV3) (json.RawMessage, error) {
var buf bytes.Buffer
md := &marshalUntypedDeployment{deployment}
t0yv0 marked this conversation as resolved.
Show resolved Hide resolved
if err := md.Write(&buf); err != nil {
return nil, err
}
f := bytes.ReplaceAll(pattern, sentinel, req.UntypedDeployment)
return f, nil
return buf.Bytes(), nil
}

func marshalVerbatimCheckpointRequest(req apitype.PatchUpdateVerbatimCheckpointRequest) (json.RawMessage, error) {
// Unlike encoding/json, using jsonIter here will not reindent req.UntypedDeployment, which is what is needed
// for the Verbatim protocol.
return jsonIterConfig.Marshal(req)
}

type marshalUntypedDeployment struct {
deployment *apitype.DeploymentV3
}

func (c *marshalUntypedDeployment) Write(w io.Writer) error {
cfg := jsonIterConfig
stream := cfg.BorrowStream(w)
defer cfg.ReturnStream(stream)
err := c.writeToStream(stream)
return err
}

func (c *marshalUntypedDeployment) writeToStream(stream *jsoniter.Stream) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no comprehension of the incantation order here 😅 For example, what does WriteMore() do? Why is it necessary? Maybe a single line comment somewhere at the top of the method would help? Not a big deal either way, I feel like someone coming across this is going to have to read the API docs anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WriteMore writes a "," .. this is a pretty low-level API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😆

stream.WriteObjectStart()
stream.WriteObjectField("version")
stream.WriteInt(3)
stream.WriteMore()
stream.WriteObjectField("deployment")
c.writeDeploymentV3(stream)
stream.WriteObjectEnd()
return stream.Flush()
}

func (c *marshalUntypedDeployment) writeDeploymentV3(stream *jsoniter.Stream) (err error) {
deployment := c.deployment
stream.WriteObjectStart()
stream.WriteObjectField("manifest")
stream.WriteVal(deployment.Manifest)
if deployment.SecretsProviders != nil {
stream.WriteMore()
stream.WriteObjectField("secrets_providers")
stream.WriteVal(deployment.SecretsProviders)
}
if err = stream.Flush(); err != nil {
return err
}
nResources := len(deployment.Resources)

maxNewlines := 1024 - 2
t0yv0 marked this conversation as resolved.
Show resolved Hide resolved
newlinePeriod := int(math.Ceil(float64(nResources) / float64(maxNewlines)))
t0yv0 marked this conversation as resolved.
Show resolved Hide resolved

if nResources > 0 {
stream.WriteMore()
stream.WriteObjectField("resources")
stream.WriteRaw("[\n")
for i, r := range deployment.Resources {
if i > 0 {
stream.WriteRaw(",")
if (nResources <= maxNewlines) || (i%newlinePeriod == 0) {
stream.WriteRaw("\n")
}
}
stream.WriteVal(r)
if err = stream.Flush(); err != nil {
return err
}
}
stream.WriteRaw("\n]")
}
if len(deployment.PendingOperations) > 0 {
stream.WriteMore()
stream.WriteObjectField("pendingOperations")
stream.WriteVal(deployment.PendingOperations)
}
stream.WriteObjectEnd()
return stream.Flush()
}
27 changes: 6 additions & 21 deletions pkg/backend/httpstate/diffs.go
Expand Up @@ -25,8 +25,6 @@ import (
"github.com/hexops/gotextdiff/myers"
"github.com/hexops/gotextdiff/span"

"github.com/pulumi/pulumi/sdk/v3/go/common/apitype"

opentracing "github.com/opentracing/opentracing-go"
)

Expand Down Expand Up @@ -59,39 +57,30 @@ func (dds *deploymentDiffState) CanDiff() bool {

// Size-based heuristics trying to estimate if the diff method will be
// worth it and take less time than sending the entire deployment.
func (dds *deploymentDiffState) ShouldDiff(new *apitype.UntypedDeployment) bool {
func (dds *deploymentDiffState) ShouldDiff(new json.RawMessage) bool {
if !dds.CanDiff() {
return false
}
if len(dds.lastSavedDeployment) < dds.minimalDiffSize {
return false
}
if len(new.Deployment) < dds.minimalDiffSize {
if len(new) < dds.minimalDiffSize {
return false
}
return true
}

func (dds *deploymentDiffState) Diff(ctx context.Context,
deployment *apitype.UntypedDeployment) (deploymentDiff, error) {
func (dds *deploymentDiffState) Diff(ctx context.Context, deployment json.RawMessage) (deploymentDiff, error) {

if !dds.CanDiff() {
return deploymentDiff{}, fmt.Errorf("Diff() cannot be called before Saved()")
}

if deployment.Version == 0 {
return deploymentDiff{}, fmt.Errorf("deployment.Version should be set")
}

tracingSpan, childCtx := opentracing.StartSpanFromContext(ctx, "Diff")
defer tracingSpan.Finish()

before := dds.lastSavedDeployment

after, err := marshalUntypedDeployment(deployment)
if err != nil {
return deploymentDiff{}, fmt.Errorf("marshalUntypedDeployment failed: %v", err)
}
after := deployment

var checkpointHash string
checkpointHashReady := &sync.WaitGroup{}
Expand Down Expand Up @@ -125,12 +114,8 @@ func (dds *deploymentDiffState) Diff(ctx context.Context,
}

// Indicates that a deployment was just saved to the service.
func (dds *deploymentDiffState) Saved(ctx context.Context, deployment *apitype.UntypedDeployment) error {
d, err := marshalUntypedDeployment(deployment)
if err != nil {
return err
}
dds.lastSavedDeployment = d
func (dds *deploymentDiffState) Saved(ctx context.Context, deployment json.RawMessage) error {
dds.lastSavedDeployment = deployment
dds.sequenceNumber++

return nil
Expand Down
35 changes: 0 additions & 35 deletions pkg/backend/httpstate/marshal.go

This file was deleted.

20 changes: 3 additions & 17 deletions pkg/backend/httpstate/snapshot.go
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pulumi/pulumi/pkg/v3/resource/deploy"
"github.com/pulumi/pulumi/pkg/v3/resource/stack"
"github.com/pulumi/pulumi/pkg/v3/secrets"
"github.com/pulumi/pulumi/sdk/v3/go/common/apitype"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
)

Expand Down Expand Up @@ -62,7 +61,7 @@ func (persister *cloudSnapshotPersister) Save(snapshot *deploy.Snapshot) error {
persister.context, persister.update, deploymentV3, token)
}

deployment, err := persister.marshalDeployment(deploymentV3)
deployment, err := client.MarshalUntypedDeployment(deploymentV3)
if err != nil {
return err
}
Expand Down Expand Up @@ -102,23 +101,10 @@ func (persister *cloudSnapshotPersister) saveDiff(ctx context.Context,
}

func (persister *cloudSnapshotPersister) saveFullVerbatim(ctx context.Context,
differ *deploymentDiffState, deployment *apitype.UntypedDeployment, token string) error {
RobbieMcKinstry marked this conversation as resolved.
Show resolved Hide resolved
untypedDeploymentBytes, err := marshalUntypedDeployment(deployment)
if err != nil {
return err
}
differ *deploymentDiffState, deployment json.RawMessage, token string) error {
return persister.backend.client.PatchUpdateCheckpointVerbatim(
persister.context, persister.update, differ.SequenceNumber(),
untypedDeploymentBytes, token)
}

func (persister *cloudSnapshotPersister) marshalDeployment(
deployment *apitype.DeploymentV3) (*apitype.UntypedDeployment, error) {
raw, err := json.MarshalIndent(deployment, "", "")
if err != nil {
return nil, fmt.Errorf("serializing deployment to json: %w", err)
}
return &apitype.UntypedDeployment{Deployment: raw, Version: 3}, nil
deployment, token)
}

var _ backend.SnapshotPersister = (*cloudSnapshotPersister)(nil)
Expand Down