Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"panic: sync: negative WaitGroup counter" in PartitionProcessor.VisitValues #433

Open
akshatraika-moment opened this issue Sep 19, 2023 · 8 comments

Comments

@akshatraika-moment
Copy link

Hi, seeing a panic loop in https://github.com/lovoo/goka/blob/master/partition_processor.go#L685 and I am using version 1.1.7

Here is the stack trace:

/go/pkg/mod/golang.org/x/sync@v0.1.0/errgroup/errgroup.go:72 +0xa5
created by golang.org/x/sync/errgroup.(*Group).Go
/go/pkg/mod/golang.org/x/sync@v0.1.0/errgroup/errgroup.go:75 +0x64
golang.org/x/sync/errgroup.(*Group).Go.func1()
/go/pkg/mod/github.com/lovoo/goka@v1.1.7/multierr/errgroup.go:48 +0x52
github.com/lovoo/goka/multierr.(*ErrGroup).Go.func1()
/go/pkg/mod/github.com/lovoo/goka@v1.1.7/processor.go:934 +0x39
github.com/lovoo/goka.(*Processor).VisitAllWithStats.func1()
/go/pkg/mod/github.com/lovoo/goka@v1.1.7/partition_processor.go:716 +0x69a
github.com/lovoo/goka.(*PartitionProcessor).VisitValues(0xc000a20500, {0x1607528, 0xc0178d4c40}, {0x1270c8a, 0x8}, {0x1124020?, 0xc00062d560}, 0xc0188d0550)
/go/pkg/mod/github.com/lovoo/goka@v1.1.7/partition_processor.go:701
github.com/lovoo/goka.(*PartitionProcessor).VisitValues.func1(...)
/usr/local/go/src/sync/waitgroup.go:108
sync.(*WaitGroup).Done(...)
/usr/local/go/src/sync/waitgroup.go:83 +0xda
sync.(*WaitGroup).Add(0xc018024e48?, 0xc018024d7c?)
goroutine 1087 [running]:
panic: sync: negative WaitGroup counter

There is possibly a race condition in the VisitValues function. Can someone please take a look?

Happy to provide more information if needed but this seems to be independent of my implementation.

@mikemoment
Copy link

Can someone please take a look at this issue? It caused a failure in our production env. Thanks!

@asoliman20
Copy link

+1

I'm also having this issue!

@frairon
Copy link
Contributor

frairon commented Sep 20, 2023

Could you provide some information how you use the visit functionality? Like how is it initialized, what's done inside the visit function and so on?

@akshatraika-moment
Copy link
Author

akshatraika-moment commented Sep 21, 2023

Hi @frairon , thanks for the quick response. For sure, here is how we have initialized and used it:
main =>

func main_runner() error {
	if err := config.Init(); err != nil {
		if errors.Is(err, flag.ErrHelp) {
			return nil
		}

		return fmt.Errorf("loading configuration: %w", err)
	}

	ctx := context.Background()
	ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
	defer stop()

	if err := database.Init(ctx); err != nil {
		return fmt.Errorf("setting up database: %w", err)
	}

	if err := app.Init(ctx, g); err != nil {
		return fmt.Errorf("in application: %w", err)
	}

	return g.Wait()
}

app.New() =>

// New creates a new instance of the stream processing application.
func New(brokers []string, opts ...Option) (*App, error) {
	// Default options
	opt := &options{
		groupName:        "servicename",
		inputTopic:       "topicname",
		logger:           zap.NewNop(),
		processCallback:  DefaultProcessCallback,
		snapshotInterval: 10 * time.Second,
	}

	// Update defaults
	opt.apply(opts...)

	app := &App{
		logger:           opt.logger,
		snapshotInterval: opt.snapshotInterval,
	}

	// Define the group graph
	graph := goka.DefineGroup(
		opt.groupName,
		goka.Input(opt.inputTopic, orderbook.DeltaCodec{}, process),
		goka.Visitor("snapshot", app.snapshot),
		goka.Persist(orderbook.Codec{}),
	)

	// Create processor
	proc, err := goka.NewProcessor(brokers, graph, opt.processorOptions...)
	if err != nil {
		return nil, err
	}

	app.processor = proc
	return app, nil
}

app.Run() =>

func (app *App) Run(ctx context.Context, db database.Interface) error {
	if err := app.setupMetrics(); err != nil {
		return fmt.Errorf("setting up metrics: %w", err)
	}

	g, ctx := errgroup.WithContext(ctx)
	sw := database.NewSafeAsyncBatchSnapshotWriter(db, 1000)
	g.Go(func() error {
		return sw.Run(ctx)
	})

	g.Go(func() error {
		return app.processor.Run(ctx)
	})

	// Snapshotter thread.
	g.Go(func() error {
		app.logger.Info("snapshotter started")
		defer app.logger.Info("snapshotter stopped")

		for {
			select {
			case <-ctx.Done():
				return nil
			default:
			}

			// Trigger the visitor callback, which writes a snapshot to the db
			visited, err := app.processor.VisitAllWithStats(ctx, "snapshot", sw)
			if err != nil {
				app.logger.Error("error while snapshotting", zap.Error(err))
			}

			// Flush the async snapshot writer between rounds of snapshotting.
			if err := sw.Flush(context.TODO()); err != nil {
				app.logger.Error("error flushing snapshot", zap.Error(err))
			}

			app.logger.Info("visit complete", zap.Int64("num_visited", visited))

			// Sleep for a few seconds before taking another round of snapshots.
			sleep, cancel := context.WithTimeout(ctx, app.snapshotInterval)
			<-sleep.Done()
			cancel()
		}
	})

	return g.Wait()
}

app.Init() =>

// Init sets up and runs the stream processing application, including the
// snapshotter thread, which will be restarted each time the underlying Kafka
// consumer group rebalances.
func Init(ctx context.Context, g *errgroup.Group) error {
	cfg := goka.DefaultConfig()
	kafka := config.GetKafkaConfig()
	if kafka.AuthMechanism == config.KafkaAuthIAM {
		cfg.Net.SASL.Enable = true
		cfg.Net.SASL.Mechanism = sarama.SASLTypeAWSMSKIAM
		cfg.Net.SASL.AWSMSKIAM = sarama.AWSMSKIAMConfig{Region: "ignored"}
	}

	if kafka.UseTLS {
		cfg.Net.TLS.Enable = true
	}

	// Set up application
	goka.ReplaceGlobalConfig(cfg)
	meter := otel.Meter(pkgName)
	tracer = otel.Tracer(pkgName)
	messagesProcessed, _ = meter.Int64Counter("input.messages_processed")
	logger := zap.L()
	db := database.GetDatabaseInstance()
	opts := []Option{
		WithLogger(logger),
		WithProcessCallback(process),
		WithProcessorOptions(
			goka.WithConsumerGroupBuilder(consumerGroupBuilder()),
			goka.WithStorageBuilder(storage.MemoryBuilder()),
			goka.WithTopicManagerBuilder(
				topicManagerBuilder(kafka.TableReplicationFactor),
			),
		),
	}

	if kafka.GroupName != "" {
		opts = append(opts, WithGroupName(kafka.GroupName))
	}

	if kafka.InputTopic != "" {
		opts = append(opts, WithInputTopic(kafka.InputTopic))
	}

	app, err := New(kafka.BootstrapServers, opts...)
	if err != nil {
		logger.Error("failed to create application", zap.Error(err))
		return fmt.Errorf("creating application: %w", err)
	}

	g.Go(func() error {
		if err := app.Run(ctx, db); err != nil {
			logger.Error("error in application", zap.Error(err))
			return fmt.Errorf("running application: %w", err)
		}

		return nil
	})

	return nil
}

Inside the visit function, we are just writing a snapshot of the results to postgres. Our processor keeps an incremental record of the data we get, does some ETL into an in-memory store struct. A bit hesitant to give a lot of information on the internals of those processors and visitors since this is business IP.

But I think the issue is independent of what is happening inside that function, right? The wait group counter should be resilient to even situation except a seg fault. IMO, the error lies in the way the wg.Done() is being called in the library in the partition_processor.

Please let me know if you need more information. Appreciate the help on this!

@frairon
Copy link
Contributor

frairon commented Sep 22, 2023

Hey @akshatraika-moment,

thanks a lot for the detailed information! No need to share more, especially not sensitive information - don't worry :).

We found something indeed, which might have caused this behavior on two conditions:

  • the visit function is slower than the state-iterator
  • the visit function panics on the inside.

For some background:
Inside the processor, the visit function is draining the visit-channel to clean up un-visited items. This happens if either the visit-context was closed or the processor is shutting down.
We think that the bug was triggered when the processor was shutting down due to a panic inside the visit-callback, which would close the channel. The draining function failed to check for the channel-state so it would call wg.Done() in an infinite loop causing it to crash.

But that also meant that somewhere in the logs of your application there should be an error logged somewhere. Or maybe it got swallowed by the panic before it could surface somewhere.

Anyway, this PR should fix the issue, hoping it does also in your case. Would you be able to test it before we release it or are you dependend on a proper release?

Just as a note on the Visit in general: this functionality is specifically meant to "tap" into the processor event loop to be able to modify the processor state while the processor is running. It's mainly used for migrating/fixing/cleaning up something that can't be done from the outside. It's kind of an open-heart surgery, that's also why the whole processor will shutdown if an error occurs in the visit-callback.
If you can and your use case allows it, it's much safer to create a new View on the processor table and use view.Iterator() to do whatever needs to be done with the data.

@akshatraika-moment
Copy link
Author

Thanks @frairon I looked at the fix you made. Although, I am not sure if this is going to fix the problem entirely. I found more stuff on Friday. I think there might be a race condition somewhere in the waitgroups when the CPU is overloaded. I have an hypothesis that there is a correlation between CPU killing goroutines and the wg counter losing track. Here are 2 graphs.
image

image

The histogram is the graph of the panic logs and the other graph is the CPU usage in my service. As you can see, they look very similar.

Unfortunately, the only place where we see the error is in production so we will not be able to test your PR there. But once you merge it in, we can give it a try. Right now, I am trying to move over the service to using views - thanks for suggesting that.

frairon added a commit that referenced this issue Sep 27, 2023
* issue 433: bugfix on message drain for panic in visits
@frairon
Copy link
Contributor

frairon commented Sep 27, 2023

Hmm, I don't think the CPU is killing goroutines, also the high CPU-usage is probably caused by the iteration itself, because right now there is no rate control when iterating the state. So not sure what the actual cause is.
But anyway, the fix is merged and released to v1.1.10.
Maybe you can check it out and see if it solved your issue.

Thanks!

@frairon
Copy link
Contributor

frairon commented Oct 17, 2023

@akshatraika-moment does the new version fix your issue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants