Skip to content

Commit

Permalink
Fixes first-write-wins with SQLite and MySQL
Browse files Browse the repository at this point in the history
dapr#3159 introduced a regression that caused sets with first-write-wins fail if the table was empty

The issue was due to a "FROM" statement in the CTE that should not have been there.

Also improved how the SQLite component detects etag failures when FWW is used

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
  • Loading branch information
ItalyPaleAle committed Nov 2, 2023
1 parent 8680e27 commit 89e7087
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 24 deletions.
1 change: 0 additions & 1 deletion state/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,6 @@ func (m *MySQL) setValue(parentCtx context.Context, querier querier, req *state.
CURRENT_TIMESTAMP AS updateDate,
? AS eTag,
` + ttlQuery + ` AS expiredate
FROM ` + m.tableName + `
WHERE NOT EXISTS (
SELECT 1
FROM ` + m.tableName + `
Expand Down
51 changes: 28 additions & 23 deletions state/sqlite/sqlite_dbaccess.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,6 @@ func (a *sqliteDBAccess) doSet(parentCtx context.Context, db querier, req *state
}

// Only check for etag if FirstWrite specified (ref oracledatabaseaccess)
var (
res sql.Result
stmt string
)
ctx, cancel := context.WithTimeout(context.Background(), a.metadata.Timeout)
defer cancel()

Expand All @@ -350,10 +346,9 @@ func (a *sqliteDBAccess) doSet(parentCtx context.Context, db querier, req *state
case !req.HasETag() && req.Options.Concurrency == state.FirstWrite:
// If the operation uses first-write concurrency, we need to handle the special case of a row that has expired but hasn't been garbage collected yet
// In this case, the row should be considered as if it were deleted
stmt = `WITH a AS (
stmt := `WITH a AS (
SELECT
?, ?, ?, ?, ` + expiration + `, CURRENT_TIMESTAMP
FROM ` + a.metadata.TableName + `
WHERE NOT EXISTS (
SELECT 1
FROM ` + a.metadata.TableName + `
Expand All @@ -362,17 +357,29 @@ func (a *sqliteDBAccess) doSet(parentCtx context.Context, db querier, req *state
)
)
INSERT OR REPLACE INTO ` + a.metadata.TableName + `
SELECT * FROM a`
res, err = db.ExecContext(ctx, stmt, req.Key, requestValue, isBinary, newEtag, req.Key)
SELECT * FROM a
RETURNING 1`
var num int
err = db.QueryRowContext(ctx, stmt, req.Key, requestValue, isBinary, newEtag, req.Key).Scan(&num)
if err != nil {
// If no row was returned, it means no row was updated, so we had an etag failure
if errors.Is(err, sql.ErrNoRows) {
return state.NewETagError(state.ETagMismatch, nil)
}
return err
}

case !req.HasETag():
stmt = "INSERT OR REPLACE INTO " + a.metadata.TableName + `
stmt := "INSERT OR REPLACE INTO " + a.metadata.TableName + `
(key, value, is_binary, etag, update_time, expiration_time)
VALUES(?, ?, ?, ?, CURRENT_TIMESTAMP, ` + expiration + `)`
res, err = db.ExecContext(ctx, stmt, req.Key, requestValue, isBinary, newEtag)
_, err = db.ExecContext(ctx, stmt, req.Key, requestValue, isBinary, newEtag)
if err != nil {
return err
}

default:
stmt = `UPDATE ` + a.metadata.TableName + ` SET
stmt := `UPDATE ` + a.metadata.TableName + ` SET
value = ?,
etag = ?,
is_binary = ?,
Expand All @@ -382,22 +389,20 @@ func (a *sqliteDBAccess) doSet(parentCtx context.Context, db querier, req *state
key = ?
AND etag = ?
AND (expiration_time IS NULL OR expiration_time > CURRENT_TIMESTAMP)`
var res sql.Result
res, err = db.ExecContext(ctx, stmt, requestValue, newEtag, isBinary, req.Key, *req.ETag)
}
if err != nil {
return err
}
if err != nil {
return err
}

// Count the number of affected rows
rows, err := res.RowsAffected()
if err != nil {
return err
}
if rows == 0 {
if req.HasETag() || req.Options.Concurrency == state.FirstWrite {
rows, err := res.RowsAffected()
if err != nil {
return err
}
if rows == 0 {
// 0 affected rows means etag failure
return state.NewETagError(state.ETagMismatch, nil)
}
return errors.New("no item was updated")
}

return nil
Expand Down

0 comments on commit 89e7087

Please sign in to comment.