Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2] Supports the transfer of any file #6826

Open
wants to merge 6 commits into
base: dev
Choose a base branch
from

Conversation

Hisoka-X
Copy link
Member

@Hisoka-X Hisoka-X commented May 9, 2024

Purpose of this pull request

This PR add new file format binary. With this format, SeaTunnel can transfer any file, such as compressed packages, pictures, etc. We can move files between different environments.
There are a demo of copy file from local to local.

env {
  parallelism = 1
  job.mode = "BATCH"
}
source {
  LocalFile {
    path = "/seatunnel/read/binary/"
    file_format_type = "binary"
  }
}
sink {
  // you can transfer local file to s3/hdfs/oss etc.
  LocalFile {
    path = "/seatunnel/read/binary2/"
    file_format_type = "binary"
  }
}

Does this PR introduce any user-facing change?

no

How was this patch tested?

add new test.

Check list

@Hisoka-X
Copy link
Member Author

Hisoka-X commented May 9, 2024

cc @TyrantLucifer @hailin0

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cute cat ?

@liunaijie
Copy link
Contributor

does this means we can use this feature copy file between different file systems?

@Hisoka-X
Copy link
Member Author

does this means we can use this feature copy file between different file systems?

yep.

Comment on lines +74 to +79
while ((readSize = inputStream.read(buffer)) != -1) {
if (readSize != maxSize) {
buffer = Arrays.copyOf(buffer, readSize);
}
SeaTunnelRow row = new SeaTunnelRow(new Object[] {buffer, relativePath, partIndex});
buffer = new byte[1024];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this while loop, a new buffer is reallocated after each read:
buffer = new byte[1024]
It might be better to reuse an existing buffer:
Arrays.fill(buffer, (byte) 0)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, the original version is actually exactly as you described. But buffer is an object and will eventually be used by the sink writer. If we reuse it in advance, the sink will not be able to get the correct data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your reply, it is cool to read and write any file!

Copy link
Contributor

@TaoZex TaoZex left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature][HDFS File Source and Sink] Whether to support file synchronization function similar to distcp
4 participants