diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 3139c8e6fa1b..fa4e623ab768 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -382,7 +382,8 @@ type flowCreatorHelper interface { // accumulateAsyncComponent stores a component (either a router or an outbox) // to be run asynchronously. accumulateAsyncComponent(runFn) - // addMaterializer adds a materializer to the flow. + // addMaterializer adds a root materializer to the flow. This is only done + // on the gateway node. addMaterializer(*colexec.Materializer) // getCancelFlowFn returns a flow cancellation function. getCancelFlowFn() context.CancelFunc @@ -450,18 +451,16 @@ type vectorizedFlowCreator struct { exprHelper *colexecargs.ExprHelper typeResolver descs.DistSQLTypeResolver - // numOutboxes counts how many exec.Outboxes have been set up on this node. - // It must be accessed atomically. - numOutboxes int32 - materializerAdded bool - - // numOutboxesExited is an atomic that keeps track of how many outboxes have exited. - // When numOutboxesExited equals numOutboxes, the cancellation function for the flow - // is called. + // numOutboxes counts how many colrpc.Outbox'es have been set up on this + // node. It must be accessed atomically. + numOutboxes int32 + // numOutboxesExited is an atomic that keeps track of how many outboxes have + // exited. When numOutboxesExited equals numOutboxes, the cancellation + // function for the flow is called on the non-gateway nodes. numOutboxesExited int32 - // numOutboxesDrained is an atomic that keeps track of how many outboxes have - // been drained. When numOutboxesDrained equals numOutboxes, flow-level metadata is - // added to a flow-level span. + // numOutboxesDrained is an atomic that keeps track of how many outboxes + // have been drained. When numOutboxesDrained equals numOutboxes, flow-level + // metadata is added to a flow-level span on the non-gateway nodes. numOutboxesDrained int32 // procIdxQueue is a queue of indices into processorSpecs (the argument to @@ -525,6 +524,7 @@ func newVectorizedFlowCreator( streamIDToInputOp: creator.streamIDToInputOp, streamIDToSpecIdx: creator.streamIDToSpecIdx, recordingStats: recordingStats, + isGatewayNode: isGatewayNode, waitGroup: waitGroup, syncFlowConsumer: syncFlowConsumer, nodeDialer: nodeDialer, @@ -665,12 +665,12 @@ func (s *vectorizedFlowCreator) setupRemoteOutputStream( // When the last Outbox on this node exits, we want to make sure that // everything is shutdown; namely, we need to call cancelFn if: // - it is the last Outbox - // - there is no root materializer on this node (if it were, it would take - // care of the cancellation itself) + // - the node is not the gateway (there is a root materializer on the + // gateway that will take care of the cancellation itself) // - cancelFn is non-nil (it can be nil in tests). // Calling cancelFn will cancel the context that all infrastructure on this // node is listening on, so it will shut everything down. - if atomic.AddInt32(&s.numOutboxesExited, 1) == atomic.LoadInt32(&s.numOutboxes) && !s.materializerAdded && cancelFn != nil { + if atomic.AddInt32(&s.numOutboxesExited, 1) == atomic.LoadInt32(&s.numOutboxes) && !s.isGatewayNode && cancelFn != nil { cancelFn() } } @@ -1045,7 +1045,6 @@ func (s *vectorizedFlowCreator) setupOutput( // A materializer is a leaf. s.leaves = append(s.leaves, proc) s.addMaterializer(proc) - s.materializerAdded = true default: return errors.Errorf("unsupported output stream type %s", outputStream.Type) }