Skip to content

Commit

Permalink
CosmosDBState - Add, as option, the partitionKey in QueryMethod (#3227)
Browse files Browse the repository at this point in the history
Signed-off-by: luigirende <luigirende@gmail.com>
Signed-off-by: Luigi Rende <luigirende@gmail.com>
Signed-off-by: Bernd Verst <github@bernd.dev>
Co-authored-by: Bernd Verst <github@bernd.dev>
  • Loading branch information
luigirende and berndverst committed Jan 25, 2024
1 parent 805c1f8 commit 120a649
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 14 deletions.
14 changes: 12 additions & 2 deletions state/azure/cosmosdb/cosmosdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,11 @@ func (p crossPartitionQueryPolicy) Do(req *policy.Request) (*http.Response, erro
// Check if we're performing a query
// In that case, remove the partitionkey header and enable cross-partition queries
if strings.ToLower(raw.Header.Get("x-ms-documentdb-query")) == "true" {
raw.Header.Add("x-ms-documentdb-query-enablecrosspartition", "true")
raw.Header.Del("x-ms-documentdb-partitionkey")
// Only when the partitionKey is fake (true), it will be removed amd enabled the cross partition
if strings.ToLower(raw.Header.Get("x-ms-documentdb-partitionkey")) == "[true]" {
raw.Header.Add("x-ms-documentdb-query-enablecrosspartition", "true")
raw.Header.Del("x-ms-documentdb-partitionkey")
}
}
return req.Next()
}
Expand Down Expand Up @@ -206,6 +209,7 @@ func (c *StateStore) Features() []state.Feature {
state.FeatureTransactional,
state.FeatureQueryAPI,
state.FeatureTTL,
state.FeaturePartitionKey,
}
}

Expand Down Expand Up @@ -594,6 +598,12 @@ func (c *StateStore) Query(ctx context.Context, req *state.QueryRequest) (*state
return &state.QueryResponse{}, err
}

// If present partitionKey, the value will be used in the query disabling the cross partition
q.partitionKey = ""
if val, found := req.Metadata[metadataPartitionKey]; found {
q.partitionKey = val
}

data, token, err := q.execute(ctx, c.client)
if err != nil {
return nil, err
Expand Down
16 changes: 12 additions & 4 deletions state/azure/cosmosdb/cosmosdb_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ type InternalQuery struct {
}

type Query struct {
query InternalQuery
limit int
token string
query InternalQuery
limit int
token string
partitionKey string
}

func (q *Query) VisitEQ(f *query.EQ) (string, error) {
Expand Down Expand Up @@ -264,7 +265,14 @@ func (q *Query) execute(ctx context.Context, client *azcosmos.ContainerClient) (
}

items := []CosmosItem{}
pk := azcosmos.NewPartitionKeyBool(true)

var pk azcosmos.PartitionKey
if q.partitionKey != "" {
pk = azcosmos.NewPartitionKeyString(q.partitionKey)
} else {
pk = azcosmos.NewPartitionKeyBool(true)
}

queryPager := client.NewQueryItemsPager(q.query.query, pk, opts)

token := ""
Expand Down
2 changes: 2 additions & 0 deletions state/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (
FeatureTTL Feature = "TTL"
// FeatureDeleteWithPrefix is the feature that supports deleting with prefix.
FeatureDeleteWithPrefix Feature = "DELETE_WITH_PREFIX"
// FeaturePartitionKey is the feature that supports the partition
FeaturePartitionKey Feature = "PARTITION_KEY"
)

// Feature names a feature that can be implemented by state store components.
Expand Down
97 changes: 89 additions & 8 deletions tests/conformance/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ type StructType struct {
Product struct {
Value int `json:"value"`
} `json:"product"`
Status string `json:"status"`
Status string `json:"status"`
Message string `json:"message"`
}

type intValueType struct {
Expand All @@ -62,8 +63,10 @@ type scenario struct {
}

type queryScenario struct {
query string
results []state.QueryItem
query string
results []state.QueryItem
metadata map[string]string
partitionOnly bool
}

type TestConfig struct {
Expand Down Expand Up @@ -130,14 +133,19 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
key: fmt.Sprintf("%s-struct-operations", key),
value: StructType{Product: struct {
Value int `json:"value"`
}{Value: 15}, Status: "ACTIVE"},
}{Value: 15}, Status: "ACTIVE", Message: fmt.Sprintf("%smessage", key)},
contentType: contenttype.JSONContentType,
},
{
key: fmt.Sprintf("%s-struct-operations-inactive", key),
value: StructType{Product: struct {
Value int `json:"value"`
}{Value: 12}, Status: "INACTIVE"},
}{Value: 12}, Status: "INACTIVE", Message: fmt.Sprintf("%smessage", key)},
contentType: contenttype.JSONContentType,
},
{
key: fmt.Sprintf("%s-struct-2", key),
value: ValueType{Message: fmt.Sprintf("%stest", key)},
contentType: contenttype.JSONContentType,
},
{
Expand Down Expand Up @@ -261,6 +269,9 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
{
"filter": {
"AND": [
{
"EQ": {"message": "` + key + `message"}
},
{
"GTE": {"product.value": 10}
},
Expand All @@ -277,7 +288,7 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
results: []state.QueryItem{
{
Key: fmt.Sprintf("%s-struct-operations", key),
Data: []byte(fmt.Sprintf(`{"product":{"value":15}, "status":"ACTIVE"}`)),
Data: []byte(fmt.Sprintf(`{"product":{"value":15}, "status":"ACTIVE","message":"%smessage"}`, key)),
},
},
},
Expand All @@ -288,6 +299,9 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
"OR": [
{
"AND": [
{
"EQ": {"message": "` + key + `message"}
},
{
"GT": {"product.value": 11.1}
},
Expand All @@ -298,6 +312,9 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
},
{
"AND": [
{
"EQ": {"message": "` + key + `message"}
},
{
"LTE": {"product.value": 0.5}
},
Expand All @@ -313,7 +330,65 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
results: []state.QueryItem{
{
Key: fmt.Sprintf("%s-struct-operations-inactive", key),
Data: []byte(fmt.Sprintf(`{"product":{"value":12}, "status":"INACTIVE"}`)),
Data: []byte(fmt.Sprintf(`{"product":{"value":12}, "status":"INACTIVE","message":"%smessage"}`, key)),
},
},
},

// In CosmosDB this query uses the cross partition ( value with 2 different partitionKey <key>-struct and <key>-struct-2)
{
query: `
{
"filter": {
"OR": [
{
"EQ": {"message": "` + key + `test"}
},
{
"IN": {"message": ["test` + key + `", "dummy"]}
}
]
}
}
`,
// Return 2 item from 2 different partitionKey (<key>-struct and <key>-struct-2), for default the partitionKey is equals to key
results: []state.QueryItem{
{
Key: fmt.Sprintf("%s-struct", key),
Data: []byte(fmt.Sprintf(`{"message":"test%s"}`, key)),
},
{
Key: fmt.Sprintf("%s-struct-2", key),
Data: []byte(fmt.Sprintf(`{"message":"%stest"}`, key)),
},
},
},

// Test for CosmosDB (filter test with partitionOnly) this query doesn't use the cross partition ( value from 2 different partitionKey %s-struct and %s-struct-2)
{
query: `
{
"filter": {
"OR": [
{
"EQ": {"message": "` + key + `test"}
},
{
"IN": {"message": ["test` + key + `", "dummy"]}
}
]
}
}
`,
metadata: map[string]string{
"partitionKey": fmt.Sprintf("%s-struct-2", key),
},
partitionOnly: true,
// The same query from previous test but return only item having the same partitionKey value (%s-struct-2) given in the metadata
results: []state.QueryItem{
{
Key: fmt.Sprintf("%s-struct-2", key),
Data: []byte(fmt.Sprintf(`{"message":"%stest"}`, key)),
},
},
},
Expand Down Expand Up @@ -382,10 +457,12 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
// Check if query feature is listed
features := statestore.Features()
require.True(t, state.FeatureQueryAPI.IsPresent(features))

querier, ok := statestore.(state.Querier)
assert.True(t, ok, "Querier interface is not implemented")
for _, scenario := range queryScenarios {
if (scenario.partitionOnly) && (!state.FeaturePartitionKey.IsPresent(features)) {
break
}
t.Logf("Querying value presence for %s", scenario.query)
var req state.QueryRequest
err := json.Unmarshal([]byte(scenario.query), &req.Query)
Expand All @@ -395,6 +472,10 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
metadata.QueryIndexName: "qIndx",
}

if val, found := scenario.metadata["partitionKey"]; found {
req.Metadata["partitionKey"] = val
}

resp, err := querier.Query(context.Background(), &req)
require.NoError(t, err)
assert.Equal(t, len(scenario.results), len(resp.Results))
Expand Down

0 comments on commit 120a649

Please sign in to comment.