Skip to content

Commit

Permalink
feat!: Add path variable support to GCS destination plugin. (#17602)
Browse files Browse the repository at this point in the history
Fixes #15097 . Since the PR is fairly large, please review it commit by
commit along with a read of the commit message descriptions to get a
better sense of the changes.

#### Summary

Add path variable support to GCS destination plugin. **This is a
breaking change**. We check if the path string has any of the template
variables like {{FORMAT}}, {{TABLE}} and so on. If they do, we follow
the behaviour of S3 plugin wherein we replace the template variables and
that's it. If there are no such variables, we revert back to the current
behaviour in the code block shown above.

Please read the issue comments to check on how the decisions for this PR
were made.

This PR follows similar approach to the S3 plugin. I've added some extra
tests for client.read() errors which we should probably copy back to S3
and File plugins in a follow up issue.

Note: I have not made additions to CHANGELOG.md, I'm assuming the
maintainer will do it just before merge, in order to make sure of the
right tag at the time of merge. Please lmk if I need to make that change

<details> 
<summary> GCS screenshot for path `new/{{TABLE}}/{{UUID}}.{{FORMAT}}`
</summary>

![Screenshot 2024-04-12 at 1 46
50 AM](https://github.com/cloudquery/cloudquery/assets/13910561/6b17748b-6686-4c42-8e9d-4d40a2c270b3)


</details>

- [x] Read the [contribution
guidelines](https://github.com/cloudquery/cloudquery/blob/main/CONTRIBUTING.md)
🧑‍🎓
- [x] Run `make lint` to ensure the proposed changes follow the coding
style 🚨 (install golangci-lint
[here](https://golangci-lint.run/usage/install/#local-installation))
- [x] Run `make test` to ensure the proposed changes pass the tests 🧪
- [x] If changing a source plugin run `make gen` to ensure docs are up
to date 📝
- [x] Ensure the status checks below are successful ✅

---------

Co-authored-by: Erez Rokah <erezrokah@users.noreply.github.com>
  • Loading branch information
shubham-padia and erezrokah committed Apr 12, 2024
1 parent 9b9ae21 commit 1ea5680
Show file tree
Hide file tree
Showing 11 changed files with 393 additions and 15 deletions.
4 changes: 3 additions & 1 deletion plugins/destination/gcs/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Client struct {
streamingbatchwriter.UnimplementedDeleteStale
streamingbatchwriter.UnimplementedDeleteRecords

syncID string
logger zerolog.Logger
spec *spec.Spec

Expand All @@ -32,9 +33,10 @@ type Client struct {
writer *streamingbatchwriter.StreamingBatchWriter
}

func New(ctx context.Context, logger zerolog.Logger, s []byte, _ plugin.NewClientOptions) (plugin.Client, error) {
func New(ctx context.Context, logger zerolog.Logger, s []byte, newClientOpts plugin.NewClientOptions) (plugin.Client, error) {
c := &Client{
logger: logger.With().Str("module", "gcs").Logger(),
syncID: newClientOpts.InvocationID,
}

if err := json.Unmarshal(s, &c.spec); err != nil {
Expand Down
30 changes: 26 additions & 4 deletions plugins/destination/gcs/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const bucket = "cq-dest-gcs"
Expand All @@ -39,9 +40,28 @@ func TestPlugin(t *testing.T) {
})

t.Run("write/"+string(ft), func(t *testing.T) {
testPluginCustom(t, &s)
testPluginCustom(t, &s, "")
})
}
t.Run("should give an error while reading when no_rotate is false", func(t *testing.T) {
s := spec.Spec{
Bucket: bucket,
Path: t.TempDir(),
NoRotate: false,
FileSpec: filetypes.FileSpec{Format: filetypes.FormatTypeCSV},
}
testPluginCustom(t, &s, "reading is not supported when `no_rotate` is false")
})

t.Run("should give an error while reading when {{UUID}} path variable is present", func(t *testing.T) {
s := spec.Spec{
Bucket: bucket,
Path: "{{UUID}}",
NoRotate: true,
FileSpec: filetypes.FileSpec{Format: filetypes.FormatTypeCSV},
}
testPluginCustom(t, &s, "reading is not supported when path contains uuid variable")
})
}

func testPlugin(t *testing.T, s *spec.Spec) {
Expand All @@ -66,7 +86,7 @@ func testPlugin(t *testing.T, s *spec.Spec) {
)
}

func testPluginCustom(t *testing.T, s *spec.Spec) {
func testPluginCustom(t *testing.T, s *spec.Spec, readErrorString string) {
ctx := context.Background()

var client plugin.Client
Expand Down Expand Up @@ -119,10 +139,12 @@ func testPluginCustom(t *testing.T, s *spec.Spec) {
}

readRecords, err := readAll(ctx, client, table)
if err != nil {
t.Fatal(fmt.Errorf("failed to sync: %w", err))
if readErrorString != "" {
require.ErrorContains(t, err, readErrorString)
return
}

require.NoError(t, err)
totalItems := plugin.TotalRows(readRecords)
assert.Equalf(t, int64(2), totalItems, "expected 2 items, got %d", totalItems)
}
Expand Down
9 changes: 8 additions & 1 deletion plugins/destination/gcs/client/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,23 @@ import (
"context"
"fmt"
"io"
"time"

"github.com/apache/arrow/go/v15/arrow"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/google/uuid"
)

func (c *Client) Read(ctx context.Context, table *schema.Table, res chan<- arrow.Record) error {
if !c.spec.NoRotate {
return fmt.Errorf("reading is not supported when `no_rotate` is false. Table: %q", table.Name)
}
name := fmt.Sprintf("%s/%s.%s%s", c.spec.Path, table.Name, c.spec.Format, c.spec.FileSpec.Compression.Extension())
if c.spec.PathContainsUUID() {
return fmt.Errorf("reading is not supported when path contains uuid variable. Table: %q", table.Name)
}

name := c.spec.ReplacePathVariables(table.Name, uuid.NewString(), time.Time{}, c.syncID)

r, err := c.bucket.Object(name).NewReader(ctx)
if err != nil {
return err
Expand Down
104 changes: 103 additions & 1 deletion plugins/destination/gcs/client/spec/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,109 @@ func (s Spec) JSONSchemaExtend(sc *jsonschema.Schema) {
},
}

sc.AllOf = append(sc.AllOf, noRotateNoBatch)
// path patterns: should be a clean path
cleanPath := &jsonschema.Schema{
Title: "`path` is a clean path value",
Not: &jsonschema.Schema{
Title: "`path` is not a clean path value",
AnyOf: []*jsonschema.Schema{
{
Title: "`path` contains `./`",
Properties: func() *orderedmap.OrderedMap[string, *jsonschema.Schema] {
properties := orderedmap.New[string, *jsonschema.Schema]()
properties.Set("path", &jsonschema.Schema{
Type: "string",
Pattern: `^.*\./.*$`,
})
return properties
}(),
},
{
Title: "`path` contains `//`",
Properties: func() *orderedmap.OrderedMap[string, *jsonschema.Schema] {
properties := orderedmap.New[string, *jsonschema.Schema]()
properties.Set("path", &jsonschema.Schema{
Type: "string",
Pattern: `^.*//.*$`,
})
return properties
}(),
},
},
},
}

pathWithUUID := &jsonschema.Schema{
Title: "Require {{UUID}} to be present in path",
Properties: func() *orderedmap.OrderedMap[string, *jsonschema.Schema] {
// we make the non-zero requirement, so we want to allow only null here
properties := orderedmap.New[string, *jsonschema.Schema]()
properties.Set("path", &jsonschema.Schema{
Type: "string",
Pattern: `^.*\{\{UUID\}\}.*$`,
})
return properties
}(),
}
// no_rotate:true -> no {{UUID}} should be present in path
noRotateNoUUID := &jsonschema.Schema{
Title: "Disallow {{UUID}} in path when using no_rotate",
If: &jsonschema.Schema{
Properties: func() *orderedmap.OrderedMap[string, *jsonschema.Schema] {
noRotate := *sc.Properties.Value("no_rotate")
noRotate.Default = nil
noRotate.Const = true
noRotate.Description = ""
properties := orderedmap.New[string, *jsonschema.Schema]()
properties.Set("no_rotate", &noRotate)
return properties
}(),
Required: []string{"no_rotate"},
},
Then: &jsonschema.Schema{
Not: pathWithUUID,
},
}

/* batching enabled -> require {{UUID}} in path or require no path variables in path,
since we will use UUID by default if batch */
uuidWhenBatching := &jsonschema.Schema{
Title: "Require {{UUID}} in path when batching",
If: &jsonschema.Schema{
// It's enough to disallow setting no_rotate to true
// As otherwise we're requiring the positive batch size (& bytes) values
Title: "Disallow setting no_rotate to true",
Properties: func() *orderedmap.OrderedMap[string, *jsonschema.Schema] {
noRotate := *sc.Properties.Value("no_rotate")
noRotate.Default = nil
noRotate.Const = false
noRotate.Description = ""
properties := orderedmap.New[string, *jsonschema.Schema]()
properties.Set("no_rotate", &noRotate)
return properties
}(),
},
Then: &jsonschema.Schema{
AnyOf: []*jsonschema.Schema{
pathWithUUID,
{
Title: "`path` does not contain path variables",
Properties: func() *orderedmap.OrderedMap[string, *jsonschema.Schema] {
properties := orderedmap.New[string, *jsonschema.Schema]()
properties.Set("path", &jsonschema.Schema{
Not: &jsonschema.Schema{
Type: "string",
Pattern: `^.*{{.*}}.*$`,
},
})
return properties
}(),
},
},
},
}

sc.AllOf = append(sc.AllOf, noRotateNoBatch, cleanPath, noRotateNoUUID, uuidWhenBatching)
}

//go:embed schema.json
Expand Down
87 changes: 87 additions & 0 deletions plugins/destination/gcs/client/spec/schema.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 44 additions & 1 deletion plugins/destination/gcs/client/spec/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,26 @@ func TestSpecJSONSchema(t *testing.T) {
Spec: `{"format": "csv", "path": "abc", "bucket": 123}`,
Err: true,
},

{
Name: "path starts with /",
Spec: `{"format": "csv", "path": "/{{UUID}}", "bucket": "b", "region": "r"}`,
Err: true,
},
{
Name: "path contains //",
Spec: `{"format": "csv", "path": "{{UUID}}//", "bucket": "b", "region": "r"}`,
Err: true,
},
{
Name: "path contains ./",
Spec: `{"format": "csv", "path": "{{UUID}}/./", "bucket": "b", "region": "r"}`,
Err: true,
},
{
Name: "path contains ../",
Spec: `{"format": "csv", "path": "{{UUID}}/../", "bucket": "b", "region": "r"}`,
Err: true,
},
{
Name: "null no_rotate",
Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate": null}`,
Expand Down Expand Up @@ -143,6 +162,30 @@ func TestSpecJSONSchema(t *testing.T) {
Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "batch_timeout":null}`,
},

// no_rotate + path({{UUID}})
{
Name: "no_rotate:false & path:{{UUID}}",
Spec: `{"format": "csv", "path": "{{UUID}}", "bucket": "b", "no_rotate":false}`,
},
{
Name: "no_rotate:true & path:{{UUID}}",
Spec: `{"format": "csv", "path": "{{UUID}}", "bucket": "b", "no_rotate":true}`,
Err: true,
},
{
Name: "no_rotate:false & path:abc",
Spec: `{"format": "csv", "path": "abc", "bucket": "b", "no_rotate":false}`,
},
{
Name: "no_rotate:true & path:abc",
Spec: `{"format": "csv", "path": "abc", "bucket": "b", "no_rotate":true}`,
},
{
Name: "no_rotate:false & path:{{TABLE}}",
Spec: `{"format": "csv", "path": "{{TABLE}}", "bucket": "b", "no_rotate":false}`,
Err: true,
},

// no_rotate + batching
{
Name: "no_rotate:false & batch_size:100",
Expand Down

0 comments on commit 1ea5680

Please sign in to comment.