Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ExecuteBatchCAS not applied, MapExecuteBatchCAS is applied #1746

Open
danthegoodman1 opened this issue Apr 7, 2024 · 11 comments
Open

ExecuteBatchCAS not applied, MapExecuteBatchCAS is applied #1746

danthegoodman1 opened this issue Apr 7, 2024 · 11 comments

Comments

@danthegoodman1
Copy link

Using the latest version of scylla on docker, I have the following code:

	b := tx.session.NewBatch(gocql.UnloggedBatch)

	for key, val := range tx.pendingWrites {
		// Write rowLock and data to primary key
		lock := rowLock{
			PrimaryLockKey: tx.primaryLockKey,
			StartTs:        tx.readTime.UnixNano(),
			TimeoutTs:      tx.readTime.Add(time.Second * 5).UnixNano(),
		}

		encodedLock, err := lock.Encode()
		if err != nil {
			return fmt.Errorf("error in rowLock.Encode: %w", err)
		}

		// Insert the lock
		b.Entries = append(b.Entries, gocql.BatchEntry{
			Stmt: fmt.Sprintf("insert into \"%s\" (key, ts, col, val) values (?, 0, 'l', ?) if not exists", tx.table),
			Args: []any{key, []byte(encodedLock)},
		})

		// Insert the data record
		b.Entries = append(b.Entries, gocql.BatchEntry{
			Stmt: fmt.Sprintf("insert into \"%s\" (key, ts, col, val) values (?, ?, 'd', ?) if not exists", tx.table),
			Args: []any{key, tx.readTime.UnixNano(), val},
		})
	}

	// Non-map version was always having applied: false
	applied, _, err := tx.session.MapExecuteBatchCAS(b, make(map[string]interface{}))
	if err != nil {
		return fmt.Errorf("error in MapExecuteBatchCAS: %w", err)
	}

	if !applied {
		return fmt.Errorf("%w: prewrite not applied (confict)", &TxnAborted{})
	}

When using ExecuteBatchCAS(b), the applied return would always be false, but when using MapExecuteBatchCAS(b, make(map[string]interface{})) (despite not actaully binding to anything), applied would be true.

Why is this? My understand is that it si a difference of binding only.

@testisnullus
Copy link

@danthegoodman1 Hello, I am investigating this issue. For now, I can verify that ExecuteBatchCAS() and MapExecuteBatchCAS() functions are working correctly.

When using MapExecuteBatchCAS(), the function involves passing a map which is intended to hold any values returned by the operation if the batch is not applied. The typical usage of MapExecuteBatchCAS() involves passing a map (even the empty one, like make(map[string]interface{})) that will store column values from the row that prevented the batch from being applied successfully.

Also, If the data state matches the condition (like IF NOT EXISTS in this particular case), the MapExecuteBatchCAS() operation can return applied = true, in case the data did not need any modification or was modified as expected. But, if the data state does not match, you cas see applied = false, indicating the batch was not applied because conditions weren't met.

@danthegoodman1
Copy link
Author

danthegoodman1 commented Apr 30, 2024

@testisnullus I'm not sure you've understood the issue, or rather I was likely not clear enough. The issue is that ExecuteBatchCAS is returning applied false, when it should be true (there were no issues).

@testisnullus
Copy link

@danthegoodman1 I am testing on the Cassandra 4.1.4 version. The ExecuteBatchCAS() function works without any issues for me. I've tried to execute it with some data that is not in the table to bypass the IF NOT EXISTS statement provided in the CQL query:

		b.Entries = append(b.Entries, gocql.BatchEntry{
			Stmt: fmt.Sprintf("INSERT INTO \"%s\" (name, study_date, sleep_time_hours) VALUES (?, ?, ?) IF NOT EXISTS", tx.table),
			Args: []any{key, studyDate, sleepTimeHours32},
		})

		b.Entries = append(b.Entries, gocql.BatchEntry{
			Stmt: fmt.Sprintf("UPDATE \"%s\" SET sleep_time_hours=? WHERE name=? AND study_date=?", tx.table),
			Args: []any{sleepTimeHours32, key, "2023-04-30"},
		})

I've called the ExecuteBatchCAS() func the same way:

	applied, _, err := tx.session.ExecuteBatchCAS(b)
	if err != nil {
		return fmt.Errorf("error in ExecuteBatchCAS: %w", err)
	}

If I println the applied variable here, it appears true in this case.

Could you please provide more information about your issues with the ExecuteBatchCAS() function? Do you have any error messages related to this operation?

@danthegoodman1
Copy link
Author

No errors, I noticed purely that it would never apply (checking the DB shows that it indeed did not apply), but if I simply swapped it with the Map- version it would apply.

@danthegoodman1
Copy link
Author

Here is the keyspace and table I used

func createKeyspace(s *gocql.Session) error {
	if err := s.Query("CREATE KEYSPACE if not exists test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1}").Exec(); err != nil {
		return fmt.Errorf("error creating keyspace: %w", err)
	}

	return nil
}

func createTable(s *gocql.Session) error {
	if err := s.Query("create table if not exists testkv (key text, col text, ts bigint, val blob, primary key (key, col, ts)) with clustering order by (col desc, ts desc)").Exec(); err != nil {
		return fmt.Errorf("error creating table: %w", err)
	}

	return nil
}

@testisnullus
Copy link

@danthegoodman1 Hello, I've tested the ExecuteBatchCAS() with your keyspace and table examples.

The table looks like this after batch operation:

ra@cqlsh> SELECT * FROM test.testkv;

 key  | col | ts                  | val
------+-----+---------------------+--------------------------
 key1 |   l |                   0 | 0x656e636f6465644c6f636b
 key1 |   d | 1714566727267344355 |           0x76616c756531

(2 rows)

I can share the full code snippet with you also:

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/gocql/gocql"
)

type Tx struct {
	session       *gocql.Session
	pendingWrites map[string][]byte
	readTime      time.Time
	table         string
}

type TxnAborted struct{}

func (e *TxnAborted) Error() string {
	return "Transaction aborted due to write conflict"
}

func createKeyspace(s *gocql.Session) error {
	if err := s.Query("CREATE KEYSPACE if not exists test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1}").Exec(); err != nil {
		return fmt.Errorf("error creating keyspace: %w", err)
	}

	return nil
}

func createTable(s *gocql.Session) error {
	if err := s.Query("create table if not exists testkv (key text, col text, ts bigint, val blob, primary key (key, col, ts)) with clustering order by (col desc, ts desc)").Exec(); err != nil {
		return fmt.Errorf("error creating table: %w", err)
	}

	return nil
}

func (tx *Tx) ExecuteBatchTransaction() error {
	b := tx.session.NewBatch(gocql.UnloggedBatch)

	for key, val := range tx.pendingWrites {
		b.Entries = append(b.Entries, gocql.BatchEntry{
			Stmt: fmt.Sprintf("insert into \"%s\" (key, ts, col, val) values (?, 0, 'l', ?) if not exists", tx.table),
			Args: []any{key, []byte("encodedLock")},
		})

		// Insert the data record
		b.Entries = append(b.Entries, gocql.BatchEntry{
			Stmt: fmt.Sprintf("insert into \"%s\" (key, ts, col, val) values (?, ?, 'd', ?) if not exists", tx.table),
			Args: []any{key, tx.readTime.UnixNano(), val},
		})
	}

	applied, _, err := tx.session.ExecuteBatchCAS(b)
	if err != nil {
		return fmt.Errorf("error in ExecuteBatchCAS: %w", err)
	}

	if !applied {
		return fmt.Errorf("%w: prewrite not applied (conflict)", &TxnAborted{})
	}

	fmt.Println(applied)

	return nil
}

func main() {
	// connect to the cluster
	cluster := gocql.NewCluster("host1", "host2", "host3")
	cluster.Consistency = gocql.Quorum
	cluster.ProtoVersion = 4
	cluster.ConnectTimeout = time.Second * 10
	cluster.Keyspace = "test"
	cluster.Authenticator = gocql.PasswordAuthenticator{Username: "name", Password: "pass", AllowedAuthenticators: []string{"com.instaclustr.cassandra.auth.InstaclustrPasswordAuthenticator"}}
	session, err := cluster.CreateSession()
	if err != nil {
		log.Println(err)
		return
	}
	defer session.Close()

	err = createKeyspace(session)
	if err != nil {
		fmt.Println("Keyspace failed:", err)
	}

	err = createTable(session)
	if err != nil {
		fmt.Println("Table failed:", err)
	}

	tx := Tx{
		session:       session,
		pendingWrites: map[string][]byte{"key1": []byte("value1")},
		readTime:      time.Now(),
		table:         "testkv",
	}

	err = tx.ExecuteBatchTransaction()
	if err != nil {
		fmt.Println("Transaction failed:", err)
	}
}

The applied variable is also true in this case.

@testisnullus
Copy link

testisnullus commented May 1, 2024

Just tested on the latest ScyllaDB image (scylladb/scylla:latest), and it also works well without any issues.

What version of GoCQL and Scylla do you use now?

@danthegoodman1
Copy link
Author

Scylla is still latest, gocql was version v1.6.0

@danthegoodman1
Copy link
Author

I was also quite confused considering I changed nothing on the query and simply swapped out ExecuteBatchCAS for MapExecuteBatchCAS, no other changes.

@testisnullus
Copy link

@danthegoodman1 I cannot reproduce the issue for now :(
Does the issue affect executing queries for you or can you easily use the MapExecuteBatchCAS() function for your operations? I will be tracking this issue and will try to test the ExecuteBatchCAS() more closely to identify this problem's root cause.

@danthegoodman1
Copy link
Author

danthegoodman1 commented May 7, 2024 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants