Skip to content

Commit

Permalink
fix: proper lifecycle for following logs using consumers (#366)
Browse files Browse the repository at this point in the history
* fix: Following Container Logs feature fixes

* fix the doc: `c.FollowOutput()` MUST be called before
  `c.StartLogProducer()` due to date race
* do not allow multiple `c.StartLogProducer()` without calling a
  `c.StopLogProducer()`
* run `c.StopLogProducer()` in `c.Terminate()` to reduce risk of an
   accidental goroutine leak
* fix tests, write new tests

* fix: handle error when stopping log producer

* fix: discard error in test echo handler

* chore: initialize stop producer in reuse-recreate

* fix: do not terminate a container that has been marked for termination in test

---------

Co-authored-by: Manuel de la Peña <mdelapenya@gmail.com>
  • Loading branch information
slsyy and mdelapenya committed Mar 23, 2023
1 parent ac65ec9 commit dd66783
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 79 deletions.
36 changes: 22 additions & 14 deletions docker.go
Expand Up @@ -239,12 +239,17 @@ func (c *DockerContainer) Stop(ctx context.Context, timeout *time.Duration) erro

// Terminate is used to kill the container. It is usually triggered by as defer function.
func (c *DockerContainer) Terminate(ctx context.Context) error {
err := c.StopLogProducer()
if err != nil {
return err
}

select {
// close reaper if it was created
case c.terminationSignal <- true:
default:
}
err := c.provider.client.ContainerRemove(ctx, c.GetContainerID(), types.ContainerRemoveOptions{
err = c.provider.client.ContainerRemove(ctx, c.GetContainerID(), types.ContainerRemoveOptions{
RemoveVolumes: true,
Force: true,
})
Expand Down Expand Up @@ -351,13 +356,7 @@ func (c *DockerContainer) Logs(ctx context.Context) (io.ReadCloser, error) {
// FollowOutput adds a LogConsumer to be sent logs from the container's
// STDOUT and STDERR
func (c *DockerContainer) FollowOutput(consumer LogConsumer) {
if c.consumers == nil {
c.consumers = []LogConsumer{
consumer,
}
} else {
c.consumers = append(c.consumers, consumer)
}
c.consumers = append(c.consumers, consumer)
}

// Name gets the name of the container.
Expand Down Expand Up @@ -601,7 +600,13 @@ func (c *DockerContainer) CopyToContainer(ctx context.Context, fileContent []byt
// StartLogProducer will start a concurrent process that will continuously read logs
// from the container and will send them to each added LogConsumer
func (c *DockerContainer) StartLogProducer(ctx context.Context) error {
go func() {
if c.stopProducer != nil {
return errors.New("log producer already started")
}

c.stopProducer = make(chan bool)

go func(stop <-chan bool) {
since := ""
// if the socket is closed we will make additional logs request with updated Since timestamp
BEGIN:
Expand All @@ -625,7 +630,7 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error {

for {
select {
case <-c.stopProducer:
case <-stop:
err := r.Close()
if err != nil {
// we can't close the read closer, this should never happen
Expand Down Expand Up @@ -676,15 +681,18 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error {
}
}
}
}()
}(c.stopProducer)

return nil
}

// StopLogProducer will stop the concurrent process that is reading logs
// and sending them to each added LogConsumer
func (c *DockerContainer) StopLogProducer() error {
c.stopProducer <- true
if c.stopProducer != nil {
c.stopProducer <- true
c.stopProducer = nil
}
return nil
}

Expand Down Expand Up @@ -1037,7 +1045,7 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque
sessionID: testcontainerssession.ID(),
provider: p,
terminationSignal: termSignal,
stopProducer: make(chan bool),
stopProducer: nil,
logger: p.Logger,
}

Expand Down Expand Up @@ -1100,7 +1108,7 @@ func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req Contain
sessionID: testcontainerssession.ID(),
provider: p,
terminationSignal: termSignal,
stopProducer: make(chan bool),
stopProducer: nil,
logger: p.Logger,
isRunning: c.State == "running",
}
Expand Down
11 changes: 5 additions & 6 deletions docs/features/follow_logs.md
@@ -1,14 +1,11 @@
# Following Container Logs

If you wish to follow container logs, you can set up `LogConsumer`s. The log
following functionality follows a producer-consumer model. You will need to
explicitly start and stop the producer. As logs are written to either `stdout`,
following functionality follows a producer-consumer model. As logs are written to either `stdout`,
or `stderr` (`stdin` is not supported) they will be forwarded (produced) to any
associated `LogConsumer`s. You can associate `LogConsumer`s with the
`.FollowOutput` function.

**Please note** if you start the producer you should always stop it explicitly.

For example, this consumer will just add logs to a slice

```go
Expand All @@ -26,13 +23,13 @@ g := TestLogConsumer{
Msgs: []string{},
}

c.FollowOutput(&g) // must be called before StarLogProducer

err := c.StartLogProducer(ctx)
if err != nil {
// do something with err
}

c.FollowOutput(&g)

// some stuff happens...

err = c.StopLogProducer()
Expand All @@ -41,3 +38,5 @@ if err != nil {
}
```

`LogProducer` is stopped in `c.Terminate()`. It can be done manually during container lifecycle
using `c.StopLogProducer()`. For a particular container, only one `LogProducer` can be active at time
184 changes: 125 additions & 59 deletions logconsumer_test.go
Expand Up @@ -27,22 +27,16 @@ type TestLogConsumer struct {
}

func (g *TestLogConsumer) Accept(l Log) {
if string(l.Content) == fmt.Sprintf("echo %s\n", lastMessage) {
s := string(l.Content)
if s == fmt.Sprintf("echo %s\n", lastMessage) {
g.Ack <- true
return
}

g.Msgs = append(g.Msgs, string(l.Content))
g.Msgs = append(g.Msgs, s)
}

func Test_LogConsumerGetsCalled(t *testing.T) {
/*
send one request at a time to a server that will
print whatever was sent in the "echo" parameter, the log
consumer should get all of the messages and append them
to the Msgs slice
*/

ctx := context.Background()
req := ContainerRequest{
FromDockerfile: FromDockerfile{
Expand All @@ -59,55 +53,38 @@ func Test_LogConsumerGetsCalled(t *testing.T) {
}

c, err := GenericContainer(ctx, gReq)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

ep, err := c.Endpoint(ctx, "http")
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

g := TestLogConsumer{
Msgs: []string{},
Ack: make(chan bool),
}

err = c.StartLogProducer(ctx)
if err != nil {
t.Fatal(err)
}

c.FollowOutput(&g)

err = c.StartLogProducer(ctx)
require.NoError(t, err)

_, err = http.Get(ep + "/stdout?echo=hello")
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

_, err = http.Get(ep + "/stdout?echo=there")
if err != nil {
t.Fatal(err)
}

time.Sleep(10 * time.Second)
require.NoError(t, err)

_, err = http.Get(ep + fmt.Sprintf("/stdout?echo=%s", lastMessage))
if err != nil {
t.Fatal(err)
}
_, err = http.Get(ep + "/stdout?echo=" + lastMessage)
require.NoError(t, err)

select {
case <-g.Ack:
case <-time.After(5 * time.Second):
t.Fatal("never received final log message")
}
_ = c.StopLogProducer()

// get rid of the server "ready" log
g.Msgs = g.Msgs[1:]
assert.Nil(t, c.StopLogProducer())
assert.Equal(t, []string{"ready\n", "echo hello\n", "echo there\n"}, g.Msgs)

assert.Equal(t, []string{"echo hello\n", "echo there\n"}, g.Msgs)
terminateContainerOnEnd(t, ctx, c)
}

Expand Down Expand Up @@ -142,52 +119,141 @@ func Test_ShouldRecognizeLogTypes(t *testing.T) {
}

c, err := GenericContainer(ctx, gReq)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
terminateContainerOnEnd(t, ctx, c)

ep, err := c.Endpoint(ctx, "http")
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

g := TestLogTypeConsumer{
LogTypes: map[string]string{},
Ack: make(chan bool),
}

err = c.StartLogProducer(ctx)
if err != nil {
t.Fatal(err)
}

c.FollowOutput(&g)

err = c.StartLogProducer(ctx)
require.NoError(t, err)

_, err = http.Get(ep + "/stdout?echo=this-is-stdout")
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

_, err = http.Get(ep + "/stderr?echo=this-is-stderr")
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

_, err = http.Get(ep + fmt.Sprintf("/stdout?echo=%s", lastMessage))
if err != nil {
t.Fatal(err)
}
_, err = http.Get(ep + "/stdout?echo=" + lastMessage)
require.NoError(t, err)

<-g.Ack
_ = c.StopLogProducer()
assert.Nil(t, c.StopLogProducer())

assert.Equal(t, map[string]string{
StdoutLog: "echo this-is-stdout\n",
StderrLog: "echo this-is-stderr\n",
}, g.LogTypes)
}

func Test_MultipleLogConsumers(t *testing.T) {
ctx := context.Background()
req := ContainerRequest{
FromDockerfile: FromDockerfile{
Context: "./testresources/",
Dockerfile: "echoserver.Dockerfile",
},
ExposedPorts: []string{"8080/tcp"},
WaitingFor: wait.ForLog("ready"),
}

gReq := GenericContainerRequest{
ContainerRequest: req,
Started: true,
}

c, err := GenericContainer(ctx, gReq)
require.NoError(t, err)

ep, err := c.Endpoint(ctx, "http")
require.NoError(t, err)

first := TestLogConsumer{Msgs: []string{}, Ack: make(chan bool)}
second := TestLogConsumer{Msgs: []string{}, Ack: make(chan bool)}

c.FollowOutput(&first)
c.FollowOutput(&second)

err = c.StartLogProducer(ctx)
require.NoError(t, err)

_, err = http.Get(ep + "/stdout?echo=mlem")
require.NoError(t, err)

_, err = http.Get(ep + "/stdout?echo=" + lastMessage)
require.NoError(t, err)

<-first.Ack
<-second.Ack
assert.Nil(t, c.StopLogProducer())

assert.Equal(t, []string{"ready\n", "echo mlem\n"}, first.Msgs)
assert.Equal(t, []string{"ready\n", "echo mlem\n"}, second.Msgs)
assert.Nil(t, c.Terminate(ctx))
}

func Test_StartStop(t *testing.T) {
ctx := context.Background()
req := ContainerRequest{
FromDockerfile: FromDockerfile{
Context: "./testresources/",
Dockerfile: "echoserver.Dockerfile",
},
ExposedPorts: []string{"8080/tcp"},
WaitingFor: wait.ForLog("ready"),
}

gReq := GenericContainerRequest{
ContainerRequest: req,
Started: true,
}

c, err := GenericContainer(ctx, gReq)
require.NoError(t, err)

ep, err := c.Endpoint(ctx, "http")
require.NoError(t, err)

g := TestLogConsumer{Msgs: []string{}, Ack: make(chan bool)}

c.FollowOutput(&g)

require.NoError(t, c.StopLogProducer(), "nothing should happen even if the producer is not started")
require.NoError(t, c.StartLogProducer(ctx))
require.Error(t, c.StartLogProducer(ctx), "log producer is already started")

_, err = http.Get(ep + "/stdout?echo=mlem")
require.NoError(t, err)

require.NoError(t, c.StopLogProducer())
require.NoError(t, c.StartLogProducer(ctx))

_, err = http.Get(ep + "/stdout?echo=mlem2")
require.NoError(t, err)

_, err = http.Get(ep + "/stdout?echo=" + lastMessage)
require.NoError(t, err)

<-g.Ack
// Do not close producer here, let's delegate it to c.Terminate

assert.Equal(t, []string{
"ready\n",
"echo mlem\n",
"ready\n",
"echo mlem\n",
"echo mlem2\n",
}, g.Msgs)
assert.Nil(t, c.Terminate(ctx))
}

func TestContainerLogWithErrClosed(t *testing.T) {
if providerType == ProviderPodman {
t.Skip("Docker-in-Docker does not work with rootless Podman")
Expand Down
1 change: 1 addition & 0 deletions testresources/echoserver.go
Expand Up @@ -24,6 +24,7 @@ func echoHandler(destination *os.File) http.HandlerFunc {
l := log.New(destination, "echo ", 0)

l.Println(echo)
_ = destination.Sync()

rw.WriteHeader(http.StatusAccepted)
}
Expand Down

0 comments on commit dd66783

Please sign in to comment.