Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow setting the content type for multipart uploads #272

Merged
merged 1 commit into from Jul 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
103 changes: 94 additions & 9 deletions s3/src/bucket.rs
Expand Up @@ -834,7 +834,12 @@ impl Bucket {
reader: &mut R,
s3_path: impl AsRef<str>,
) -> Result<u16, S3Error> {
self._put_object_stream(reader, s3_path.as_ref()).await
self._put_object_stream_with_content_type(
reader,
s3_path.as_ref(),
"application/octet-stream",
)
.await
}

#[maybe_async::sync_impl]
Expand All @@ -843,14 +848,89 @@ impl Bucket {
reader: &mut R,
s3_path: impl AsRef<str>,
) -> Result<u16, S3Error> {
self._put_object_stream(reader, s3_path.as_ref())
self._put_object_stream_with_content_type(
reader,
s3_path.as_ref(),
"application/octet-stream",
)
}

/// Stream file from local path to s3, generic over T: Write with explicit content type.
///
/// # Example:
///
/// ```rust,no_run
/// use s3::bucket::Bucket;
/// use s3::creds::Credentials;
/// use anyhow::Result;
/// use std::fs::File;
/// use std::io::Write;
///
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
///
/// let bucket_name = "rust-s3-test";
/// let region = "us-east-1".parse()?;
/// let credentials = Credentials::default()?;
/// let bucket = Bucket::new(bucket_name, region, credentials)?;
/// let path = "path";
/// let test: Vec<u8> = (0..1000).map(|_| 42).collect();
/// let mut file = File::create(path)?;
/// file.write_all(&test)?;
///
/// #[cfg(feature = "with-tokio")]
/// let mut path = tokio::fs::File::open(path).await?;
///
/// #[cfg(feature = "with-async-std")]
/// let mut path = async_std::fs::File::open(path).await?;
/// // Async variant with `tokio` or `async-std` features
/// // Generic over futures_io::AsyncRead|tokio::io::AsyncRead + Unpin
/// let status_code = bucket
/// .put_object_stream_with_content_type(&mut path, "/path", "application/octet-stream")
/// .await?;
///
/// // `sync` feature will produce an identical method
/// #[cfg(feature = "sync")]
/// // Generic over std::io::Read
/// let status_code = bucket
/// .put_object_stream_with_content_type(&mut path, "/path", "application/octet-stream")?;
///
/// // Blocking variant, generated with `blocking` feature in combination
/// // with `tokio` or `async-std` features.
/// #[cfg(feature = "blocking")]
/// let status_code = bucket
/// .put_object_stream_with_content_type_blocking(&mut path, "/path", "application/octet-stream")?;
/// #
/// # Ok(())
/// # }
/// ```
#[maybe_async::async_impl]
pub async fn put_object_stream_with_content_type<R: AsyncRead + Unpin>(
&self,
reader: &mut R,
s3_path: impl AsRef<str>,
content_type: impl AsRef<str>,
) -> Result<u16, S3Error> {
self._put_object_stream_with_content_type(reader, s3_path.as_ref(), content_type.as_ref())
.await
}

#[maybe_async::sync_impl]
pub fn put_object_stream_with_content_type<R: Read>(
&self,
reader: &mut R,
s3_path: impl AsRef<str>,
content_type: impl AsRef<str>,
) -> Result<u16, S3Error> {
self._put_object_stream_with_content_type(reader, s3_path.as_ref(), content_type.as_ref())
}

#[maybe_async::async_impl]
async fn _put_object_stream<R: AsyncRead + Unpin>(
async fn _put_object_stream_with_content_type<R: AsyncRead + Unpin>(
&self,
reader: &mut R,
s3_path: &str,
content_type: &str,
) -> Result<u16, S3Error> {
// If the file is smaller CHUNK_SIZE, just do a regular upload.
// Otherwise perform a multi-part upload.
Expand All @@ -863,7 +943,7 @@ impl Bucket {
return Ok(code);
}

let command = Command::InitiateMultipartUpload;
let command = Command::InitiateMultipartUpload { content_type };
let request = RequestImpl::new(self, s3_path, command);
let (data, code) = request.response_data(false).await?;
if code >= 300 {
Expand All @@ -885,8 +965,8 @@ impl Bucket {
let command = Command::PutObject {
// part_number,
content: &chunk,
content_type: "application/octet-stream",
multipart: Some(Multipart::new(part_number, upload_id)), // upload_id: &msg.upload_id,
content_type,
};
let request = RequestImpl::new(self, &path, command);
let (data, _code) = request.response_data(true).await?;
Expand Down Expand Up @@ -923,8 +1003,13 @@ impl Bucket {
}

#[maybe_async::sync_impl]
fn _put_object_stream<R: Read>(&self, reader: &mut R, s3_path: &str) -> Result<u16, S3Error> {
let command = Command::InitiateMultipartUpload;
fn _put_object_stream_with_content_type<R: Read>(
&self,
reader: &mut R,
s3_path: &str,
content_type: &str,
) -> Result<u16, S3Error> {
let command = Command::InitiateMultipartUpload { content_type };
let request = RequestImpl::new(self, s3_path, command);
let (data, code) = request.response_data(false)?;
if code >= 300 {
Expand Down Expand Up @@ -952,8 +1037,8 @@ impl Bucket {
let command = Command::PutObject {
// part_number,
content: &chunk,
content_type: "application/octet-stream",
multipart: Some(Multipart::new(part_number, upload_id)), // upload_id: &msg.upload_id,
content_type,
};
let request = RequestImpl::new(self, &path, command);
let (data, _code) = request.response_data(true)?;
Expand Down Expand Up @@ -981,8 +1066,8 @@ impl Bucket {
part_number += 1;
let command = Command::PutObject {
content: &chunk,
content_type: "application/octet-stream",
multipart: Some(Multipart::new(part_number, upload_id)),
content_type,
};
let request = RequestImpl::new(self, &path, command);
let (data, _code) = request.response_data(true)?;
Expand Down
7 changes: 5 additions & 2 deletions s3/src/command.rs
Expand Up @@ -105,7 +105,9 @@ pub enum Command<'a> {
PresignDelete {
expiry_secs: u32,
},
InitiateMultipartUpload,
InitiateMultipartUpload {
content_type: &'a str,
},
UploadPart {
part_number: u32,
content: &'a [u8],
Expand Down Expand Up @@ -147,7 +149,7 @@ impl<'a> Command<'a> {
| Command::AbortMultipartUpload { .. }
| Command::PresignDelete { .. }
| Command::DeleteBucket => HttpMethod::Delete,
Command::InitiateMultipartUpload | Command::CompleteMultipartUpload { .. } => {
Command::InitiateMultipartUpload { .. } | Command::CompleteMultipartUpload { .. } => {
HttpMethod::Post
}
Command::HeadObject => HttpMethod::Head,
Expand All @@ -174,6 +176,7 @@ impl<'a> Command<'a> {

pub fn content_type(&self) -> String {
match self {
Command::InitiateMultipartUpload { content_type } => content_type.to_string(),
Command::PutObject { content_type, .. } => content_type.to_string(),
Command::CompleteMultipartUpload { .. } => "application/xml".into(),
_ => "text/plain".into(),
Expand Down
2 changes: 1 addition & 1 deletion s3/src/request_trait.rs
Expand Up @@ -200,7 +200,7 @@ pub trait Request {
// Append to url_path
#[allow(clippy::collapsible_match)]
match self.command() {
Command::InitiateMultipartUpload | Command::ListMultipartUploads { .. } => {
Command::InitiateMultipartUpload { .. } | Command::ListMultipartUploads { .. } => {
url_str.push_str("?uploads")
}
Command::AbortMultipartUpload { upload_id } => {
Expand Down