Skip to content

Commit

Permalink
CosmosDBState - Opportunity to add the partitionKey value in the meta…
Browse files Browse the repository at this point in the history
…data request for the Query Method (uniform with Set and Get). Disabling the cross partition

Signed-off-by: luigirende <luigirende@gmail.com>
  • Loading branch information
luigirende committed Nov 22, 2023
1 parent ba5831b commit 16ed7d1
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 9 deletions.
14 changes: 12 additions & 2 deletions state/azure/cosmosdb/cosmosdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,11 @@ func (p crossPartitionQueryPolicy) Do(req *policy.Request) (*http.Response, erro
// Check if we're performing a query
// In that case, remove the partitionkey header and enable cross-partition queries
if strings.ToLower(raw.Header.Get("x-ms-documentdb-query")) == "true" {
raw.Header.Add("x-ms-documentdb-query-enablecrosspartition", "true")
raw.Header.Del("x-ms-documentdb-partitionkey")
raw.Header.Add("x-ms-documentdb-query-enablecrosspartition", "True")
// Only when the partitionKey is fake (true), it will be removed
if strings.ToLower(raw.Header.Get("x-ms-documentdb-partitionkey")) == "[true]" {
raw.Header.Del("x-ms-documentdb-partitionkey")
}
}
return req.Next()
}
Expand Down Expand Up @@ -206,6 +209,7 @@ func (c *StateStore) Features() []state.Feature {
state.FeatureTransactional,
state.FeatureQueryAPI,
state.FeatureTTL,
state.FeaturePartitionKey,
}
}

Expand Down Expand Up @@ -594,6 +598,12 @@ func (c *StateStore) Query(ctx context.Context, req *state.QueryRequest) (*state
return &state.QueryResponse{}, err
}

// If present partitionKey, the value will be used in the query disabling the cross partition
q.partitionKey = ""
if val, found := req.Metadata[metadataPartitionKey]; found {
q.partitionKey = val
}

data, token, err := q.execute(ctx, c.client)
if err != nil {
return nil, err
Expand Down
16 changes: 12 additions & 4 deletions state/azure/cosmosdb/cosmosdb_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ type InternalQuery struct {
}

type Query struct {
query InternalQuery
limit int
token string
query InternalQuery
limit int
token string
partitionKey string
}

func (q *Query) VisitEQ(f *query.EQ) (string, error) {
Expand Down Expand Up @@ -155,7 +156,14 @@ func (q *Query) execute(ctx context.Context, client *azcosmos.ContainerClient) (
}

items := []CosmosItem{}
pk := azcosmos.NewPartitionKeyBool(true)

var pk azcosmos.PartitionKey
if q.partitionKey != "" {
pk = azcosmos.NewPartitionKeyString(q.partitionKey)
} else {
pk = azcosmos.NewPartitionKeyBool(true)
}

queryPager := client.NewQueryItemsPager(q.query.query, pk, opts)

token := ""
Expand Down
2 changes: 2 additions & 0 deletions state/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const (
FeatureQueryAPI Feature = "QUERY_API"
// FeatureTTL is the feature that supports TTLs.
FeatureTTL Feature = "TTL"
// FeaturePartitionKey is the feature that supports the partition
FeaturePartitionKey Feature = "PARTITION_KEY"
)

// Feature names a feature that can be implemented by state store components.
Expand Down
78 changes: 75 additions & 3 deletions tests/conformance/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ type scenario struct {
}

type queryScenario struct {
query string
results []state.QueryItem
query string
results []state.QueryItem
metadata map[string]string
partitionOnly bool
}

type TestConfig struct {
Expand Down Expand Up @@ -119,6 +121,11 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
value: ValueType{Message: fmt.Sprintf("test%s", key)},
contentType: contenttype.JSONContentType,
},
{
key: fmt.Sprintf("%s-struct-2", key),
value: ValueType{Message: fmt.Sprintf("%stest", key)},
contentType: contenttype.JSONContentType,
},
{
key: fmt.Sprintf("%s-struct-with-int", key),
value: intValueType{Message: 42},
Expand Down Expand Up @@ -235,6 +242,64 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
},
},
},

// In CosmosDB this query uses the cross partition ( value with 2 different partitionKey <key>-struct and <key>-struct-2)
{
query: `
{
"filter": {
"OR": [
{
"EQ": {"message": "` + key + `test"}
},
{
"IN": {"message": ["test` + key + `", "dummy"]}
}
]
}
}
`,
// Return 2 item from 2 different partitionKey (<key>-struct and <key>-struct-2), for default the partitionKey is equals to key
results: []state.QueryItem{
{
Key: fmt.Sprintf("%s-struct", key),
Data: []byte(fmt.Sprintf(`{"message":"test%s"}`, key)),
},
{
Key: fmt.Sprintf("%s-struct-2", key),
Data: []byte(fmt.Sprintf(`{"message":"%stest"}`, key)),
},
},
},

// Test for CosmosDB (filter test with partitionOnly) this query doesn't use the cross partition ( value from 2 different partitionKey %s-struct and %s-struct-2)
{
query: `
{
"filter": {
"OR": [
{
"EQ": {"message": "` + key + `test"}
},
{
"IN": {"message": ["test` + key + `", "dummy"]}
}
]
}
}
`,
metadata: map[string]string{
"partitionKey": fmt.Sprintf("%s-struct-2", key),
},
partitionOnly: true,
// The same query from previous test but return only item having the same partitionKey value (%s-struct-2) given in the metadata
results: []state.QueryItem{
{
Key: fmt.Sprintf("%s-struct-2", key),
Data: []byte(fmt.Sprintf(`{"message":"%stest"}`, key)),
},
},
},
}

t.Run("init", func(t *testing.T) {
Expand Down Expand Up @@ -300,10 +365,12 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
// Check if query feature is listed
features := statestore.Features()
require.True(t, state.FeatureQueryAPI.IsPresent(features))

querier, ok := statestore.(state.Querier)
assert.True(t, ok, "Querier interface is not implemented")
for _, scenario := range queryScenarios {
if (scenario.partitionOnly) && (!state.FeaturePartitionKey.IsPresent(features)) {
break
}
t.Logf("Querying value presence for %s", scenario.query)
var req state.QueryRequest
err := json.Unmarshal([]byte(scenario.query), &req.Query)
Expand All @@ -312,6 +379,11 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
metadata.ContentType: contenttype.JSONContentType,
metadata.QueryIndexName: "qIndx",
}

if val, found := scenario.metadata["partitionKey"]; found {
req.Metadata["partitionKey"] = val
}

resp, err := querier.Query(context.Background(), &req)
require.NoError(t, err)
assert.Equal(t, len(scenario.results), len(resp.Results))
Expand Down

0 comments on commit 16ed7d1

Please sign in to comment.