From 23958e5af9cb0992a19c9b5eddd24b0ef60d3fa6 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 18 Mar 2021 18:19:22 -0700 Subject: [PATCH] colflow: fix emitting of flow stats and simplify the creator a bit We introduced `isGatewayNode` boolean on the flow creator a couple of months ago which is used to decide whether the last outbox should emit the flow level memory and disk stats. However, we forget to properly initialize it in the constructor, and this commit fixes that issue. The impact though is very limited - we would just emit some redundant stats, and because they are used for maximum, they would get ignored. Additionally, this commit simplifies the flow creator by removing `materializerAdded` field because it is essentially a duplicate of `isGatewayNode`. Release note: None --- pkg/sql/colflow/vectorized_flow.go | 31 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 3139c8e6fa1b..fa4e623ab768 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -382,7 +382,8 @@ type flowCreatorHelper interface { // accumulateAsyncComponent stores a component (either a router or an outbox) // to be run asynchronously. accumulateAsyncComponent(runFn) - // addMaterializer adds a materializer to the flow. + // addMaterializer adds a root materializer to the flow. This is only done + // on the gateway node. addMaterializer(*colexec.Materializer) // getCancelFlowFn returns a flow cancellation function. getCancelFlowFn() context.CancelFunc @@ -450,18 +451,16 @@ type vectorizedFlowCreator struct { exprHelper *colexecargs.ExprHelper typeResolver descs.DistSQLTypeResolver - // numOutboxes counts how many exec.Outboxes have been set up on this node. - // It must be accessed atomically. - numOutboxes int32 - materializerAdded bool - - // numOutboxesExited is an atomic that keeps track of how many outboxes have exited. - // When numOutboxesExited equals numOutboxes, the cancellation function for the flow - // is called. + // numOutboxes counts how many colrpc.Outbox'es have been set up on this + // node. It must be accessed atomically. + numOutboxes int32 + // numOutboxesExited is an atomic that keeps track of how many outboxes have + // exited. When numOutboxesExited equals numOutboxes, the cancellation + // function for the flow is called on the non-gateway nodes. numOutboxesExited int32 - // numOutboxesDrained is an atomic that keeps track of how many outboxes have - // been drained. When numOutboxesDrained equals numOutboxes, flow-level metadata is - // added to a flow-level span. + // numOutboxesDrained is an atomic that keeps track of how many outboxes + // have been drained. When numOutboxesDrained equals numOutboxes, flow-level + // metadata is added to a flow-level span on the non-gateway nodes. numOutboxesDrained int32 // procIdxQueue is a queue of indices into processorSpecs (the argument to @@ -525,6 +524,7 @@ func newVectorizedFlowCreator( streamIDToInputOp: creator.streamIDToInputOp, streamIDToSpecIdx: creator.streamIDToSpecIdx, recordingStats: recordingStats, + isGatewayNode: isGatewayNode, waitGroup: waitGroup, syncFlowConsumer: syncFlowConsumer, nodeDialer: nodeDialer, @@ -665,12 +665,12 @@ func (s *vectorizedFlowCreator) setupRemoteOutputStream( // When the last Outbox on this node exits, we want to make sure that // everything is shutdown; namely, we need to call cancelFn if: // - it is the last Outbox - // - there is no root materializer on this node (if it were, it would take - // care of the cancellation itself) + // - the node is not the gateway (there is a root materializer on the + // gateway that will take care of the cancellation itself) // - cancelFn is non-nil (it can be nil in tests). // Calling cancelFn will cancel the context that all infrastructure on this // node is listening on, so it will shut everything down. - if atomic.AddInt32(&s.numOutboxesExited, 1) == atomic.LoadInt32(&s.numOutboxes) && !s.materializerAdded && cancelFn != nil { + if atomic.AddInt32(&s.numOutboxesExited, 1) == atomic.LoadInt32(&s.numOutboxes) && !s.isGatewayNode && cancelFn != nil { cancelFn() } } @@ -1045,7 +1045,6 @@ func (s *vectorizedFlowCreator) setupOutput( // A materializer is a leaf. s.leaves = append(s.leaves, proc) s.addMaterializer(proc) - s.materializerAdded = true default: return errors.Errorf("unsupported output stream type %s", outputStream.Type) }