Skip to content

Commit

Permalink
ToxicStub allow to unblocking write to Output
Browse files Browse the repository at this point in the history
It could happen when Link has no reciever and there is some packets in buffer.
It produces deadlock.

Dynamic test with latest version of toxiproxy
  • Loading branch information
miry committed Sep 9, 2022
1 parent 8e307bb commit 6f3fdcd
Show file tree
Hide file tree
Showing 10 changed files with 356 additions and 19 deletions.
11 changes: 10 additions & 1 deletion Makefile
@@ -1,4 +1,5 @@
OS := $(shell uname -s)
ARCH := $(shell uname -m)
GO_VERSION := $(shell go version | cut -f3 -d" ")
GO_MINOR_VERSION := $(shell echo $(GO_VERSION) | cut -f2 -d.)
GO_PATCH_VERSION := $(shell echo $(GO_VERSION) | cut -f3 -d. | sed "s/^\s*$$/0/")
Expand All @@ -13,8 +14,9 @@ test:
$(MALLOC_ENV) go test -v -race -timeout 1m ./...

.PHONY: test-e2e
test-e2e: build
test-e2e: build container.build
scripts/test-e2e
timeout -v --foreground 20m scripts/test-e2e-hazelcast toxiproxy

.PHONY: test-release
test-release: test bench test-e2e release-dry
Expand Down Expand Up @@ -45,6 +47,13 @@ build: dist clean
go build -ldflags="-s -w" -o ./dist/toxiproxy-server ./cmd/server
go build -ldflags="-s -w" -o ./dist/toxiproxy-cli ./cmd/cli

.PHONY: container.build
container.build:
env GOOS=linux CGO_ENABLED=0 go build -ldflags="-s -w" -o ./dist/toxiproxy-server-linux-$(ARCH) ./cmd/server
env GOOS=linux CGO_ENABLED=0 go build -ldflags="-s -w" -o ./dist/toxiproxy-cli-linux-$(ARCH) ./cmd/cli
docker build -f Dockerfile -t toxiproxy dist
docker run --rm toxiproxy --version

.PHONY: release
release:
goreleaser release --rm-dist
Expand Down
4 changes: 2 additions & 2 deletions api.go
Expand Up @@ -27,7 +27,7 @@ func stopBrowsersMiddleware(next http.Handler) http.Handler {
}

func timeoutMiddleware(next http.Handler) http.Handler {
return http.TimeoutHandler(next, 30*time.Second, "")
return http.TimeoutHandler(next, 25*time.Second, "")
}

type ApiServer struct {
Expand Down Expand Up @@ -121,7 +121,7 @@ func (server *ApiServer) Listen(host string, port string) {
srv := &http.Server{
Handler: r,
Addr: net.JoinHostPort(host, port),
WriteTimeout: 10 * time.Second,
WriteTimeout: 30 * time.Second,
ReadTimeout: 10 * time.Second,
}

Expand Down
49 changes: 36 additions & 13 deletions link.go
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"net"
"time"

"github.com/rs/zerolog"

Expand Down Expand Up @@ -73,7 +74,10 @@ func (link *ToxicLink) Start(
dest io.WriteCloser,
) {
logger := link.Logger
logger.Debug().Msg("Setup connection")
logger.
Debug().
Str("direction", link.Direction()).
Msg("Setup connection")

labels := []string{
link.Direction(),
Expand Down Expand Up @@ -133,23 +137,33 @@ func (link *ToxicLink) read(
func (link *ToxicLink) write(
metricLabels []string,
name string,
server *ApiServer,
server *ApiServer, // TODO: Replace with AppConfig for Metrics and Logger
dest io.WriteCloser,
) {
logger := link.Logger
logger := link.Logger.
With().
Str("component", "ToxicLink").
Str("method", "write").
Str("link", name).
Str("proxy", link.proxy.Name).
Str("link_addr", fmt.Sprintf("%p", link)).
Logger()

bytes, err := io.Copy(dest, link.output)
if err != nil {
logger.Warn().
Int64("bytes", bytes).
Err(err).
Msg("Destination terminated")
}
if server.Metrics.proxyMetricsEnabled() {
Msg("Could not write to destination")
} else if server.Metrics.proxyMetricsEnabled() {
server.Metrics.ProxyMetrics.SentBytesTotal.
WithLabelValues(metricLabels...).Add(float64(bytes))
}

dest.Close()
logger.Trace().Msgf("Remove link %s from ToxicCollection", name)
link.toxics.RemoveLink(name)
logger.Trace().Msgf("RemoveConnection %s from Proxy %s", name, link.proxy.Name)
link.proxy.RemoveConnection(name)
}

Expand Down Expand Up @@ -211,11 +225,11 @@ func (link *ToxicLink) RemoveToxic(ctx context.Context, toxic *toxics.ToxicWrapp
}
}

log.Trace().Msg("Interrupt the previous toxic to update its output")
log.Trace().Msg("Interrupting the previous toxic to update its output")
stop := make(chan bool)
go func() {
stop <- link.stubs[toxic_index-1].InterruptToxic()
}()
go func(stub *toxics.ToxicStub, stop chan bool) {
stop <- stub.InterruptToxic()
}(link.stubs[toxic_index-1], stop)

// Unblock the previous toxic if it is trying to flush
// If the previous toxic is closed, continue flusing until we reach the end.
Expand All @@ -231,9 +245,14 @@ func (link *ToxicLink) RemoveToxic(ctx context.Context, toxic *toxics.ToxicWrapp
if !stopped {
<-stop
}
return
return // TODO: There are some steps after this to clean buffer
}

err := link.stubs[toxic_index].WriteOutput(tmp, 5*time.Second)
if err != nil {
log.Err(err).
Msg("Could not write last packets after interrupt to Output")
}
link.stubs[toxic_index].Output <- tmp
}
}

Expand All @@ -244,7 +263,11 @@ func (link *ToxicLink) RemoveToxic(ctx context.Context, toxic *toxics.ToxicWrapp
link.stubs[toxic_index].Close()
return
}
link.stubs[toxic_index].Output <- tmp
err := link.stubs[toxic_index].WriteOutput(tmp, 5*time.Second)
if err != nil {
log.Err(err).
Msg("Could not write last packets after interrupt to Output")
}
}

link.stubs[toxic_index-1].Output = link.stubs[toxic_index].Output
Expand Down
62 changes: 62 additions & 0 deletions link_test.go
Expand Up @@ -247,6 +247,7 @@ func TestStateCreated(t *testing.T) {
if flag.Lookup("test.v").DefValue == "true" {
log = zerolog.New(os.Stdout).With().Caller().Timestamp().Logger()
}

link := NewToxicLink(nil, collection, stream.Downstream, log)
go link.stubs[0].Run(collection.chain[stream.Downstream][0])
collection.links["test"] = link
Expand All @@ -261,3 +262,64 @@ func TestStateCreated(t *testing.T) {
t.Fatalf("New toxic did not have state object created.")
}
}

func TestRemoveToxicWithBrokenConnection(t *testing.T) {
ctx := context.Background()

log := zerolog.Nop()
if flag.Lookup("test.v").DefValue == "true" {
log = zerolog.New(os.Stdout).With().Caller().Timestamp().Logger()
}
ctx = log.WithContext(ctx)

collection := NewToxicCollection(nil)
link := NewToxicLink(nil, collection, stream.Downstream, log)
go link.stubs[0].Run(collection.chain[stream.Downstream][0])
collection.links["test"] = link

toxics := [2]*toxics.ToxicWrapper{
{
Toxic: &toxics.BandwidthToxic{
Rate: 0,
},
Type: "bandwidth",
Direction: stream.Downstream,
Toxicity: 1,
},
{
Toxic: &toxics.BandwidthToxic{
Rate: 0,
},
Type: "bandwidth",
Direction: stream.Upstream,
Toxicity: 1,
},
}

collection.chainAddToxic(toxics[0])
collection.chainAddToxic(toxics[1])

done := make(chan struct{})
defer close(done)

var data uint16 = 42
go func(log zerolog.Logger) {
for {
select {
case <-done:
link.input.Close()
return
case <-time.After(10 * time.Second):
log.Print("Finish load")
return
default:
buf := make([]byte, 2)
binary.BigEndian.PutUint16(buf, data)
link.input.Write(buf)
}
}
}(log)

collection.chainRemoveToxic(ctx, toxics[0])
collection.chainRemoveToxic(ctx, toxics[1])
}
31 changes: 31 additions & 0 deletions scripts/hazelcast.xml
@@ -0,0 +1,31 @@
<?xml version="1.0" encoding="UTF-8"?>

<hazelcast xmlns="http://www.hazelcast.com/schema/config"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.hazelcast.com/schema/config
http://www.hazelcast.com/schema/config/hazelcast-config-5.1.xsd">

<properties>
<property name="hazelcast.merge.next.run.delay.seconds">15</property>
<property name="hazelcast.merge.first.run.delay.seconds">20</property>
<property name="hazelcast.partition.migration.chunks.enabled">false</property>
<property name="hazelcast.heartbeat.failuredetector.type">deadline</property>
<property name="hazelcast.heartbeat.interval.seconds">3</property>
<property name="hazelcast.max.no.heartbeat.seconds">10</property>
</properties>

<network>
<public-address>member-proxy:${proxyPort}</public-address>
<port auto-increment="false">5701</port>
<join>
<auto-detection enabled="false"/>
<tcp-ip enabled="true">
<member-list>
<member>member-proxy:${proxyPort0}</member>
<member>member-proxy:${proxyPort1}</member>
<member>member-proxy:${proxyPort2}</member>
</member-list>
</tcp-ip>
</join>
</network>
</hazelcast>
5 changes: 4 additions & 1 deletion scripts/test-e2e
Expand Up @@ -28,6 +28,10 @@ function cleanup() {
}
trap "cleanup" EXIT SIGINT SIGTERM

echo "= Toxiproxy E2E tests"
echo
echo "== Setup"
echo
echo "=== Starting Web service"

pkill -15 "toxiproxy-server" || true
Expand Down Expand Up @@ -56,7 +60,6 @@ cli toggle shopify_http
echo -e "-----------------\n"

echo "== Benchmarking"

echo
echo "=== Without toxics"

Expand Down

0 comments on commit 6f3fdcd

Please sign in to comment.