Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

SFTP input - last file not deleted #2435

Open
joffreychambrin opened this issue Mar 23, 2024 · 2 comments
Open

SFTP input - last file not deleted #2435

joffreychambrin opened this issue Mar 23, 2024 · 2 comments
Labels
bug inputs Any tasks or issues relating specifically to inputs needs investigation It looks as though have all the information needed but investigation is required

Comments

@joffreychambrin
Copy link

Hi 馃憢

I have discovered an issue when the sftp input component is configured with watcher enabled and delete_on_finish enabled.

Steps to reproduce:

  • Upload 10 files to the SFTP
  • Run the benthos pipeline (see below for config)
  • After the files have been consumed by the pipeline, check the SFTP => one file is still there even if it has been consumed by benthos.

Here is the configuration file I am using

cache_resources:
  - label: memory
    memory:
      default_ttl: 5m
      compaction_interval: 60s
      init_values: {}

input:
  sftp:
    address: ${SFTP_HOST}
    credentials:
      username: ${SFTP_USERNAME}
      password: ${SFTP_PASSWORD}
    paths:
      - "folder/**"
    scanner:
      to_the_end: {}
    delete_on_finish: true
    watcher:
      enabled: true
      minimum_age: 1s
      poll_interval: 20ms
      cache: memory

pipeline:
  processors:
    - log:
        level: INFO
        message: "message received"

output:
  label: ""
  stdout:
    codec: lines
@joffreychambrin
Copy link
Author

joffreychambrin commented Mar 23, 2024

After looking at the code, it seems to be related to this code: internal/impl/sftp/input.go

	if s.scanner, err = s.scannerCtor.Create(file, func(ctx context.Context, aErr error) (outErr error) {
		_ = s.pathProvider.Ack(ctx, nextPath, aErr)
		if aErr != nil {
			return nil
		}
		if s.deleteOnFinish {
			s.scannerMut.Lock()
			client := s.client
			if client == nil {

The mutex seems to be locked and so the last file is not being deleted

Maybe I am completely wrong, but in this case, do you need the mutex? #2436

@Jeffail Jeffail added bug inputs Any tasks or issues relating specifically to inputs needs investigation It looks as though have all the information needed but investigation is required labels Mar 25, 2024
@Jeffail
Copy link
Member

Jeffail commented Mar 25, 2024

Hey @joffreychambrin, we need some form of mutex there as we can't otherwise guarantee race conditions there. However, it should definitely be possible to fix this by reducing the scope of those locks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug inputs Any tasks or issues relating specifically to inputs needs investigation It looks as though have all the information needed but investigation is required
Projects
None yet
Development

No branches or pull requests

2 participants