Skip to content

Commit

Permalink
Merge #62222
Browse files Browse the repository at this point in the history
62222: colflow: fix emitting of flow stats and simplify the creator a bit r=yuzefovich a=yuzefovich

We introduced `isGatewayNode` boolean on the flow creator a couple of
months ago which is used to decide whether the last outbox should emit
the flow level memory and disk stats. However, we forget to properly
initialize it in the constructor, and this commit fixes that issue. The
impact though is very limited - we would just emit some redundant stats,
and because they are used for maximum, they would get ignored.

Additionally, this commit simplifies the flow creator by removing
`materializerAdded` field because it is essentially a duplicate of
`isGatewayNode`.

Release note: None

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
craig[bot] and yuzefovich committed Mar 23, 2021
2 parents ff01389 + 23958e5 commit 7b4bdfb
Showing 1 changed file with 15 additions and 16 deletions.
31 changes: 15 additions & 16 deletions pkg/sql/colflow/vectorized_flow.go
Expand Up @@ -382,7 +382,8 @@ type flowCreatorHelper interface {
// accumulateAsyncComponent stores a component (either a router or an outbox)
// to be run asynchronously.
accumulateAsyncComponent(runFn)
// addMaterializer adds a materializer to the flow.
// addMaterializer adds a root materializer to the flow. This is only done
// on the gateway node.
addMaterializer(*colexec.Materializer)
// getCancelFlowFn returns a flow cancellation function.
getCancelFlowFn() context.CancelFunc
Expand Down Expand Up @@ -450,18 +451,16 @@ type vectorizedFlowCreator struct {
exprHelper *colexecargs.ExprHelper
typeResolver descs.DistSQLTypeResolver

// numOutboxes counts how many exec.Outboxes have been set up on this node.
// It must be accessed atomically.
numOutboxes int32
materializerAdded bool

// numOutboxesExited is an atomic that keeps track of how many outboxes have exited.
// When numOutboxesExited equals numOutboxes, the cancellation function for the flow
// is called.
// numOutboxes counts how many colrpc.Outbox'es have been set up on this
// node. It must be accessed atomically.
numOutboxes int32
// numOutboxesExited is an atomic that keeps track of how many outboxes have
// exited. When numOutboxesExited equals numOutboxes, the cancellation
// function for the flow is called on the non-gateway nodes.
numOutboxesExited int32
// numOutboxesDrained is an atomic that keeps track of how many outboxes have
// been drained. When numOutboxesDrained equals numOutboxes, flow-level metadata is
// added to a flow-level span.
// numOutboxesDrained is an atomic that keeps track of how many outboxes
// have been drained. When numOutboxesDrained equals numOutboxes, flow-level
// metadata is added to a flow-level span on the non-gateway nodes.
numOutboxesDrained int32

// procIdxQueue is a queue of indices into processorSpecs (the argument to
Expand Down Expand Up @@ -525,6 +524,7 @@ func newVectorizedFlowCreator(
streamIDToInputOp: creator.streamIDToInputOp,
streamIDToSpecIdx: creator.streamIDToSpecIdx,
recordingStats: recordingStats,
isGatewayNode: isGatewayNode,
waitGroup: waitGroup,
syncFlowConsumer: syncFlowConsumer,
nodeDialer: nodeDialer,
Expand Down Expand Up @@ -665,12 +665,12 @@ func (s *vectorizedFlowCreator) setupRemoteOutputStream(
// When the last Outbox on this node exits, we want to make sure that
// everything is shutdown; namely, we need to call cancelFn if:
// - it is the last Outbox
// - there is no root materializer on this node (if it were, it would take
// care of the cancellation itself)
// - the node is not the gateway (there is a root materializer on the
// gateway that will take care of the cancellation itself)
// - cancelFn is non-nil (it can be nil in tests).
// Calling cancelFn will cancel the context that all infrastructure on this
// node is listening on, so it will shut everything down.
if atomic.AddInt32(&s.numOutboxesExited, 1) == atomic.LoadInt32(&s.numOutboxes) && !s.materializerAdded && cancelFn != nil {
if atomic.AddInt32(&s.numOutboxesExited, 1) == atomic.LoadInt32(&s.numOutboxes) && !s.isGatewayNode && cancelFn != nil {
cancelFn()
}
}
Expand Down Expand Up @@ -1045,7 +1045,6 @@ func (s *vectorizedFlowCreator) setupOutput(
// A materializer is a leaf.
s.leaves = append(s.leaves, proc)
s.addMaterializer(proc)
s.materializerAdded = true
default:
return errors.Errorf("unsupported output stream type %s", outputStream.Type)
}
Expand Down

0 comments on commit 7b4bdfb

Please sign in to comment.