Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stopping simultaneously fails #376

Open
frairon opened this issue Apr 4, 2022 · 0 comments
Open

Stopping simultaneously fails #376

frairon opened this issue Apr 4, 2022 · 0 comments

Comments

@frairon
Copy link
Contributor

frairon commented Apr 4, 2022

In most cases, when multiple processors in the same application (system test, in this case), are shut down at the same time by closing the runner-ctx - one of them fails shutting down.

this unit test reproduces the behavior:

func TestManyProcs(t *testing.T) {
	var (
		group       goka.Group = goka.Group(fmt.Sprintf("%s-%d", "goka-systemtest-manyprocs", time.Now().Unix()))
		inputStream string     = string(group) + "-input"
		joinTable   goka.Table = goka.Table(fmt.Sprintf("%s-join", group))
	)

	brokers := initSystemTest(t)

	tmc := goka.NewTopicManagerConfig()
	tmc.Table.Replication = 1
	cfg := goka.DefaultConfig()
	tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers)
	require.NoError(t, err)

	err = tm.EnsureStreamExists(inputStream, 20)
	require.NoError(t, err)
	err = tm.EnsureTableExists(string(joinTable), 20)
	require.NoError(t, err)

	require.NoError(t, tm.Close())

	time.Sleep(1 * time.Second)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	errg, ctx := multierr.NewErrGroup(ctx)
	var procs []*goka.Processor

	inputEmitter, err := goka.NewEmitter(brokers, goka.Stream(inputStream), new(codec.String))
	require.NoError(t, err)
	defer inputEmitter.Finish()
	joinEmitter, err := goka.NewEmitter(brokers, goka.Stream(joinTable), new(codec.String))
	require.NoError(t, err)
	defer joinEmitter.Finish()
	errg.Go(func() error {
		for i := 0; ; i++ {
			select {
			case <-ctx.Done():
				return nil
			default:
			}
			require.NoError(t, inputEmitter.EmitSync(fmt.Sprintf("key-%d", i), "value"))
			require.NoError(t, joinEmitter.EmitSync(fmt.Sprintf("key-%d", i), "value"))
			time.Sleep(10 * time.Millisecond)
		}
	})

	createAndRunProc := func(id int) {
		proc, err := goka.NewProcessor(brokers,
			goka.DefineGroup(
				group,
				goka.Input(goka.Stream(inputStream), new(codec.String), func(ctx goka.Context, msg interface{}) {
					ctx.SetValue(msg)
					log.Printf("messagage")
				}),
				goka.Join(joinTable, new(codec.String)),
				goka.Persist(new(codec.String)),
			),
			goka.WithHotStandby(),
			goka.WithStorageBuilder(storage.MemoryBuilder()),
		)
		require.NoError(t, err)

		errg.Go(func() error {
			log.Printf("Starting processor %d", id)
			err := proc.Run(ctx)
			if err != nil {
				return fmt.Errorf("error running proc %d: %w", id, err)
			}
			return nil
		})
		procs = append(procs, proc)
	}

	for i := 0; i < 20; i++ {
		createAndRunProc(i)
	}

	pollTimed(t, "procs 1&2 recovered", 25.0, func() bool {
		for _, proc := range procs {
			if !proc.Recovered() {
				return false
			}
		}
		return true
	})

	time.Sleep(5 * time.Second)

	cancel()
	require.NoError(t, errg.Wait().ErrorOrNil())
}

In production, this will usually not occur - but it still can. First analysis showed that the reason is the final CommitOffset when the ConsumerGroup is shutting down. One of the instances fails, maybe it is connected to being leader.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant