Skip to content

Commit

Permalink
Check to make sure context isn't expired before doing a raft operatio…
Browse files Browse the repository at this point in the history
  • Loading branch information
ncabatoff authored and jartek committed Sep 11, 2021
1 parent fbccc96 commit 3a06991
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 0 deletions.
3 changes: 3 additions & 0 deletions changelog/12162.txt
@@ -0,0 +1,3 @@
```release-note:improvement
storage/raft: Best-effort handling of cancelled contexts.
```
54 changes: 54 additions & 0 deletions physical/raft/raft.go
Expand Up @@ -978,6 +978,10 @@ func (b *RaftBackend) RemovePeer(ctx context.Context, peerID string) error {
b.l.RLock()
defer b.l.RUnlock()

if err := ctx.Err(); err != nil {
return err
}

if b.disableAutopilot {
if b.raft == nil {
return errors.New("raft storage is not initialized")
Expand Down Expand Up @@ -1034,6 +1038,10 @@ func (b *RaftBackend) GetConfigurationOffline() (*RaftConfigurationResponse, err
}

func (b *RaftBackend) GetConfiguration(ctx context.Context) (*RaftConfigurationResponse, error) {
if err := ctx.Err(); err != nil {
return nil, err
}

b.l.RLock()
defer b.l.RUnlock()

Expand Down Expand Up @@ -1068,6 +1076,10 @@ func (b *RaftBackend) GetConfiguration(ctx context.Context) (*RaftConfigurationR

// AddPeer adds a new server to the raft cluster
func (b *RaftBackend) AddPeer(ctx context.Context, peerID, clusterAddr string) error {
if err := ctx.Err(); err != nil {
return err
}

b.l.RLock()
defer b.l.RUnlock()

Expand Down Expand Up @@ -1096,6 +1108,10 @@ func (b *RaftBackend) AddPeer(ctx context.Context, peerID, clusterAddr string) e

// Peers returns all the servers present in the raft cluster
func (b *RaftBackend) Peers(ctx context.Context) ([]Peer, error) {
if err := ctx.Err(); err != nil {
return nil, err
}

b.l.RLock()
defer b.l.RUnlock()

Expand Down Expand Up @@ -1180,6 +1196,10 @@ func (b *RaftBackend) WriteSnapshotToTemp(in io.ReadCloser, access *seal.Access)
// RestoreSnapshot applies the provided snapshot metadata and snapshot data to
// raft.
func (b *RaftBackend) RestoreSnapshot(ctx context.Context, metadata raft.SnapshotMeta, snap io.Reader) error {
if err := ctx.Err(); err != nil {
return err
}

b.l.RLock()
defer b.l.RUnlock()

Expand Down Expand Up @@ -1214,6 +1234,11 @@ func (b *RaftBackend) RestoreSnapshot(ctx context.Context, metadata raft.Snapsho
// Delete inserts an entry in the log to delete the given path
func (b *RaftBackend) Delete(ctx context.Context, path string) error {
defer metrics.MeasureSince([]string{"raft-storage", "delete"}, time.Now())

if err := ctx.Err(); err != nil {
return err
}

command := &LogData{
Operations: []*LogOperation{
{
Expand All @@ -1238,9 +1263,17 @@ func (b *RaftBackend) Get(ctx context.Context, path string) (*physical.Entry, er
return nil, errors.New("raft: fsm not configured")
}

if err := ctx.Err(); err != nil {
return nil, err
}

b.permitPool.Acquire()
defer b.permitPool.Release()

if err := ctx.Err(); err != nil {
return nil, err
}

entry, err := b.fsm.Get(ctx, path)
if entry != nil {
valueLen := len(entry.Value)
Expand All @@ -1258,6 +1291,11 @@ func (b *RaftBackend) Get(ctx context.Context, path string) (*physical.Entry, er
// or if the call to applyLog fails.
func (b *RaftBackend) Put(ctx context.Context, entry *physical.Entry) error {
defer metrics.MeasureSince([]string{"raft-storage", "put"}, time.Now())

if err := ctx.Err(); err != nil {
return err
}

command := &LogData{
Operations: []*LogOperation{
{
Expand All @@ -1284,16 +1322,29 @@ func (b *RaftBackend) List(ctx context.Context, prefix string) ([]string, error)
return nil, errors.New("raft: fsm not configured")
}

if err := ctx.Err(); err != nil {
return nil, err
}

b.permitPool.Acquire()
defer b.permitPool.Release()

if err := ctx.Err(); err != nil {
return nil, err
}

return b.fsm.List(ctx, prefix)
}

// Transaction applies all the given operations into a single log and
// applies it.
func (b *RaftBackend) Transaction(ctx context.Context, txns []*physical.TxnEntry) error {
defer metrics.MeasureSince([]string{"raft-storage", "transaction"}, time.Now())

if err := ctx.Err(); err != nil {
return err
}

command := &LogData{
Operations: make([]*LogOperation, len(txns)),
}
Expand Down Expand Up @@ -1330,6 +1381,9 @@ func (b *RaftBackend) applyLog(ctx context.Context, command *LogData) error {
if b.raft == nil {
return errors.New("raft storage is not initialized")
}
if err := ctx.Err(); err != nil {
return err
}

commandBytes, err := proto.Marshal(command)
if err != nil {
Expand Down

0 comments on commit 3a06991

Please sign in to comment.