Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check to make sure context isn't expired before doing a raft operation. #12162

Merged
merged 2 commits into from Aug 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog/12162.txt
@@ -0,0 +1,3 @@
```release-note:improvement
storage/raft: Best-effort handling of cancelled contexts.
```
54 changes: 54 additions & 0 deletions physical/raft/raft.go
Expand Up @@ -978,6 +978,10 @@ func (b *RaftBackend) RemovePeer(ctx context.Context, peerID string) error {
b.l.RLock()
defer b.l.RUnlock()

if err := ctx.Err(); err != nil {
return err
}

if b.disableAutopilot {
if b.raft == nil {
return errors.New("raft storage is not initialized")
Expand Down Expand Up @@ -1034,6 +1038,10 @@ func (b *RaftBackend) GetConfigurationOffline() (*RaftConfigurationResponse, err
}

func (b *RaftBackend) GetConfiguration(ctx context.Context) (*RaftConfigurationResponse, error) {
if err := ctx.Err(); err != nil {
return nil, err
}

b.l.RLock()
defer b.l.RUnlock()

Expand Down Expand Up @@ -1068,6 +1076,10 @@ func (b *RaftBackend) GetConfiguration(ctx context.Context) (*RaftConfigurationR

// AddPeer adds a new server to the raft cluster
func (b *RaftBackend) AddPeer(ctx context.Context, peerID, clusterAddr string) error {
if err := ctx.Err(); err != nil {
return err
}

b.l.RLock()
defer b.l.RUnlock()

Expand Down Expand Up @@ -1096,6 +1108,10 @@ func (b *RaftBackend) AddPeer(ctx context.Context, peerID, clusterAddr string) e

// Peers returns all the servers present in the raft cluster
func (b *RaftBackend) Peers(ctx context.Context) ([]Peer, error) {
if err := ctx.Err(); err != nil {
return nil, err
}

b.l.RLock()
defer b.l.RUnlock()

Expand Down Expand Up @@ -1180,6 +1196,10 @@ func (b *RaftBackend) WriteSnapshotToTemp(in io.ReadCloser, access *seal.Access)
// RestoreSnapshot applies the provided snapshot metadata and snapshot data to
// raft.
func (b *RaftBackend) RestoreSnapshot(ctx context.Context, metadata raft.SnapshotMeta, snap io.Reader) error {
if err := ctx.Err(); err != nil {
return err
}

b.l.RLock()
defer b.l.RUnlock()

Expand Down Expand Up @@ -1214,6 +1234,11 @@ func (b *RaftBackend) RestoreSnapshot(ctx context.Context, metadata raft.Snapsho
// Delete inserts an entry in the log to delete the given path
func (b *RaftBackend) Delete(ctx context.Context, path string) error {
defer metrics.MeasureSince([]string{"raft-storage", "delete"}, time.Now())

if err := ctx.Err(); err != nil {
return err
}

command := &LogData{
Operations: []*LogOperation{
{
Expand All @@ -1238,9 +1263,17 @@ func (b *RaftBackend) Get(ctx context.Context, path string) (*physical.Entry, er
return nil, errors.New("raft: fsm not configured")
}

if err := ctx.Err(); err != nil {
return nil, err
}

b.permitPool.Acquire()
defer b.permitPool.Release()

if err := ctx.Err(); err != nil {
return nil, err
}

entry, err := b.fsm.Get(ctx, path)
if entry != nil {
valueLen := len(entry.Value)
Expand All @@ -1258,6 +1291,11 @@ func (b *RaftBackend) Get(ctx context.Context, path string) (*physical.Entry, er
// or if the call to applyLog fails.
func (b *RaftBackend) Put(ctx context.Context, entry *physical.Entry) error {
defer metrics.MeasureSince([]string{"raft-storage", "put"}, time.Now())

if err := ctx.Err(); err != nil {
return err
}

command := &LogData{
Operations: []*LogOperation{
{
Expand All @@ -1284,16 +1322,29 @@ func (b *RaftBackend) List(ctx context.Context, prefix string) ([]string, error)
return nil, errors.New("raft: fsm not configured")
}

if err := ctx.Err(); err != nil {
return nil, err
}

b.permitPool.Acquire()
defer b.permitPool.Release()

if err := ctx.Err(); err != nil {
return nil, err
}

return b.fsm.List(ctx, prefix)
}

// Transaction applies all the given operations into a single log and
// applies it.
func (b *RaftBackend) Transaction(ctx context.Context, txns []*physical.TxnEntry) error {
defer metrics.MeasureSince([]string{"raft-storage", "transaction"}, time.Now())

if err := ctx.Err(); err != nil {
return err
}

command := &LogData{
Operations: make([]*LogOperation, len(txns)),
}
Expand Down Expand Up @@ -1330,6 +1381,9 @@ func (b *RaftBackend) applyLog(ctx context.Context, command *LogData) error {
if b.raft == nil {
return errors.New("raft storage is not initialized")
}
if err := ctx.Err(); err != nil {
return err
}

commandBytes, err := proto.Marshal(command)
if err != nil {
Expand Down