Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WriteBatch Method in BatchOutput Interface Does Not Return Errors Anymore #2549

Open
mohitmarwal opened this issue Apr 29, 2024 Discussed in #2548 · 6 comments
Open

WriteBatch Method in BatchOutput Interface Does Not Return Errors Anymore #2549

mohitmarwal opened this issue Apr 29, 2024 Discussed in #2548 · 6 comments
Labels
needs more info An issue that may be a bug or useful feature, but requires more information

Comments

@mohitmarwal
Copy link

mohitmarwal commented Apr 29, 2024

Discussed in #2548

Originally posted by mohitmarwal April 29, 2024
Previously, when using the BatchOutput interface in Benthos, the WriteBatch method used to return an error if delivery was not possible. This was crucial for handling error scenarios and responding appropriately, especially in HTTP server contexts where the client expects meaningful error responses.

Earlier for example we were getting message error in the input - httpServer: . But now this error is not returned back to http server instead we get request time out with error printed in console in loop.

func (o *BatchOutput) WriteBatch(ctx context.Context, batch types.MessageBatch) error {
return fmt.Errorf("message error: %w", err)

}

However, with the latest update, it seems that the WriteBatch method does not return errors anymore. Instead, when an error occurs, the client receives a request timeout response, which is misleading and does not provide useful information about the actual error.

Expected Behavior:

The WriteBatch method in the BatchOutput interface should return an error if delivery is not possible, as it did before. This error should be propagated back to the caller so that appropriate error handling can be performed, such as returning meaningful HTTP responses with error details.

Steps to Reproduce:

Use the BatchOutput interface in Benthos.
Implement a custom output that implements this interface.
Invoke the WriteBatch method with a scenario where delivery is not possible (e.g., invalid message format, connection failure).
Observe that instead of receiving an error, the client gets a request timeout response.

Impact:

This change affects users who rely on the BatchOutput interface for writing batches of messages, especially in scenarios where error handling and response generation are critical, such as HTTP servers.

Proposed Solution:

Restore the previous behavior of the WriteBatch method to return errors when delivery is not possible. This ensures consistency and enables users to handle error scenarios appropriately.

@FerroEduardo
Copy link

I'm facing a similar problem when using the elasticsearch output, which I cannot handle the connection error and the pipeline gets stuck in a loop. The code below never reaches the stdout output:

input:
  generate:
    mapping: 'root = {"hello": "world"}'
    interval: 1s
    count: 1

pipeline:
  processors:
    - log:
        level: INFO
        message: "processing event: ${!content()}"

output:
  broker:
    pattern: fan_out_sequential_fail_fast
    outputs:
      - elasticsearch:
          urls:
            - https://localhost:1234
          index: "my-index"
          id: ${!timestamp_unix()}
          max_retries: 1
          tls:
            enabled: true
            skip_cert_verify: true
          basic_auth:
            enabled: true
            username: elastic
            password: admin
          healthcheck: false
          sniff: false
        processors:
          - mapping: '{"message": "elasticsearch preprocessor", "timestamp": timestamp_unix()}'
      - stdout:
          codec: lines
        processors:
          - mapping: '{"message": "stdout preprocessor", "timestamp": timestamp_unix()}'
Output
INFO Running main config from specified file       @service=benthos benthos_version=unknown path=test-elastic.yaml
INFO Listening for HTTP requests at: http://0.0.0.0:4195  @service=benthos
INFO Input type generate is now active             @service=benthos label="" path=root.input
INFO Output type elasticsearch is now active       @service=benthos label="" path=root.output.broker.outputs.0
INFO Launching a benthos instance, use CTRL+C to close  @service=benthos
INFO Output type stdout is now active              @service=benthos label="" path=root.output.broker.outputs.1
INFO processing event: {"hello":"world"}           @service=benthos label="" path=root.pipeline.processors.0
ERRO Failed to send message to elasticsearch: Post "https://localhost:1234/_bulk": dial tcp 127.0.0.1:1234: connect: connection refused  @service=benthos label="" path=root.output.broker.outputs.0
INFO processing event: {"hello":"world"}           @service=benthos label="" path=root.pipeline.processors.0
ERRO Failed to send message to elasticsearch: no available connection: no Elasticsearch node available  @service=benthos label="" path=root.output.broker.outputs.0
INFO processing event: {"hello":"world"}           @service=benthos label="" path=root.pipeline.processors.0
ERRO Failed to send message to elasticsearch: Post "https://localhost:1234/_bulk": dial tcp 127.0.0.1:1234: connect: connection refused  @service=benthos label="" path=root.output.broker.outputs.0
INFO processing event: {"hello":"world"}           @service=benthos label="" path=root.pipeline.processors.0
ERRO Failed to send message to elasticsearch: no available connection: no Elasticsearch node available  @service=benthos label="" path=root.output.broker.outputs.0
INFO processing event: {"hello":"world"}           @service=benthos label="" path=root.pipeline.processors.0

As the problem seems related to the BatchOutput interface, other components, such as aws_s3, behave in the same way.

@Jeffail
Copy link
Member

Jeffail commented May 2, 2024

@mohitmarwal when you say "previously", do you mean version 4.26.0? And is there a an example you can provide where this behaviour shows up? For example, the following config behaves as I'd expect:

input:
  http_server: {}

output:
  http_client:
    url: example.com/not/going/to/work

Running curl http://localhost:4195/post -d "hello world" returns an error as expected.

@FerroEduardo this is a separate issue, unfortunately for some components when they are failing during a connection loop they will block traffic even when a DLQ is configured. There's an existing issue for this: #1210

@Jeffail Jeffail added the needs more info An issue that may be a bug or useful feature, but requires more information label May 2, 2024
@mohitmarwal
Copy link
Author

mohitmarwal commented May 6, 2024

@Jeffail Issue occured when i updated from version V 4.24.0 and 4.25.1.

@mohitmarwal
Copy link
Author

mohitmarwal commented May 6, 2024

@Jeffail l here is the suedo code where the issue occurs
type myBatchOutput struct {
count int // Just one simple field
}

func (m *myBatchOutput) WriteBatch(ctx context.Context, batch service.MessageBatch) error {
// Implement your WriteBatch method here
for _, msg := range batch {
// Process each message
fmt.Println("Processing message:", msg)
}
return nil
}

func (m *myBatchOutput) Close(ctx context.Context) error {
fmt.Println("disconnected:")
return nil
}

func (m *myBatchOutput) Connect(ctx context.Context) error {

// Implement your Connect method here
return nil

}

func main() {
// Initialize your BatchOutput implementation and use it as needed
err := service.RegisterBatchOutput("my_batch_output", service.NewConfigSpec(), newMyBatchOutput)
if err != nil {
panic(err)
}
service.RunCLI(context.Background())
}

func newMyBatchOutput(conf *service.ParsedConfig, mgr *service.Resources) (
output service.BatchOutput,
batchPolicy service.BatchPolicy,
maxInFlight int,
err error,
) {

output = &myBatchOutput{
	count: 10, // Set a default value or configure from conf
}
// Assign a default max in flight
maxInFlight = 10
fmt.Println("newbatchoutputfunc:")
return 

}

input:
type: http_server
address: ":80"
path: "/test"
http_server:
path: "/test"
allowed_verbs:
- POST
- GET

pipeline:
processors: []

output:
type: plugin
plugin:
name: your_custom_plugin_name

output:
type: stdout

@mohitmarwal
Copy link
Author

mohitmarwal commented May 6, 2024

@Jeffail in my setup even this doesnt return the hello world but gives me request time out error

input:
http_server: {
path: /test
}
output:
http_client:
url: https://httpbin.org/hidden-basic-auth/:user/:passwd

MINGW64 ~/Downloads/benthos_4.27.0_windows_amd64.tar/benthos_4.27.0_windows_amd64
$ curl -X POST http://localhost:4195/test -d "hello world"
Request timed out

@mohitmarwal
Copy link
Author

@Jeffail any updates on this please let me know if you want more

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs more info An issue that may be a bug or useful feature, but requires more information
Projects
None yet
Development

No branches or pull requests

3 participants