Skip to content

Commit

Permalink
Allow setting the content type for multipart uploads (#272)
Browse files Browse the repository at this point in the history
  • Loading branch information
ns-sjorgedeaguiar committed Jul 21, 2022
1 parent e547b08 commit f453fc2
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 12 deletions.
103 changes: 94 additions & 9 deletions s3/src/bucket.rs
Expand Up @@ -834,7 +834,12 @@ impl Bucket {
reader: &mut R,
s3_path: impl AsRef<str>,
) -> Result<u16, S3Error> {
self._put_object_stream(reader, s3_path.as_ref()).await
self._put_object_stream_with_content_type(
reader,
s3_path.as_ref(),
"application/octet-stream",
)
.await
}

#[maybe_async::sync_impl]
Expand All @@ -843,14 +848,89 @@ impl Bucket {
reader: &mut R,
s3_path: impl AsRef<str>,
) -> Result<u16, S3Error> {
self._put_object_stream(reader, s3_path.as_ref())
self._put_object_stream_with_content_type(
reader,
s3_path.as_ref(),
"application/octet-stream",
)
}

/// Stream file from local path to s3, generic over T: Write with explicit content type.
///
/// # Example:
///
/// ```rust,no_run
/// use s3::bucket::Bucket;
/// use s3::creds::Credentials;
/// use anyhow::Result;
/// use std::fs::File;
/// use std::io::Write;
///
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
///
/// let bucket_name = "rust-s3-test";
/// let region = "us-east-1".parse()?;
/// let credentials = Credentials::default()?;
/// let bucket = Bucket::new(bucket_name, region, credentials)?;
/// let path = "path";
/// let test: Vec<u8> = (0..1000).map(|_| 42).collect();
/// let mut file = File::create(path)?;
/// file.write_all(&test)?;
///
/// #[cfg(feature = "with-tokio")]
/// let mut path = tokio::fs::File::open(path).await?;
///
/// #[cfg(feature = "with-async-std")]
/// let mut path = async_std::fs::File::open(path).await?;
/// // Async variant with `tokio` or `async-std` features
/// // Generic over futures_io::AsyncRead|tokio::io::AsyncRead + Unpin
/// let status_code = bucket
/// .put_object_stream_with_content_type(&mut path, "/path", "application/octet-stream")
/// .await?;
///
/// // `sync` feature will produce an identical method
/// #[cfg(feature = "sync")]
/// // Generic over std::io::Read
/// let status_code = bucket
/// .put_object_stream_with_content_type(&mut path, "/path", "application/octet-stream")?;
///
/// // Blocking variant, generated with `blocking` feature in combination
/// // with `tokio` or `async-std` features.
/// #[cfg(feature = "blocking")]
/// let status_code = bucket
/// .put_object_stream_with_content_type_blocking(&mut path, "/path", "application/octet-stream")?;
/// #
/// # Ok(())
/// # }
/// ```
#[maybe_async::async_impl]
pub async fn put_object_stream_with_content_type<R: AsyncRead + Unpin>(
&self,
reader: &mut R,
s3_path: impl AsRef<str>,
content_type: impl AsRef<str>,
) -> Result<u16, S3Error> {
self._put_object_stream_with_content_type(reader, s3_path.as_ref(), content_type.as_ref())
.await
}

#[maybe_async::sync_impl]
pub fn put_object_stream_with_content_type<R: Read>(
&self,
reader: &mut R,
s3_path: impl AsRef<str>,
content_type: impl AsRef<str>,
) -> Result<u16, S3Error> {
self._put_object_stream_with_content_type(reader, s3_path.as_ref(), content_type.as_ref())
}

#[maybe_async::async_impl]
async fn _put_object_stream<R: AsyncRead + Unpin>(
async fn _put_object_stream_with_content_type<R: AsyncRead + Unpin>(
&self,
reader: &mut R,
s3_path: &str,
content_type: &str,
) -> Result<u16, S3Error> {
// If the file is smaller CHUNK_SIZE, just do a regular upload.
// Otherwise perform a multi-part upload.
Expand All @@ -863,7 +943,7 @@ impl Bucket {
return Ok(code);
}

let command = Command::InitiateMultipartUpload;
let command = Command::InitiateMultipartUpload { content_type };
let request = RequestImpl::new(self, s3_path, command);
let (data, code) = request.response_data(false).await?;
if code >= 300 {
Expand All @@ -885,8 +965,8 @@ impl Bucket {
let command = Command::PutObject {
// part_number,
content: &chunk,
content_type: "application/octet-stream",
multipart: Some(Multipart::new(part_number, upload_id)), // upload_id: &msg.upload_id,
content_type,
};
let request = RequestImpl::new(self, &path, command);
let (chunk_data, chunk_code) = request.response_data(true).await?;
Expand Down Expand Up @@ -936,8 +1016,13 @@ impl Bucket {
}

#[maybe_async::sync_impl]
fn _put_object_stream<R: Read>(&self, reader: &mut R, s3_path: &str) -> Result<u16, S3Error> {
let command = Command::InitiateMultipartUpload;
fn _put_object_stream_with_content_type<R: Read>(
&self,
reader: &mut R,
s3_path: &str,
content_type: &str,
) -> Result<u16, S3Error> {
let command = Command::InitiateMultipartUpload { content_type };
let request = RequestImpl::new(self, s3_path, command);
let (data, code) = request.response_data(false)?;
if code >= 300 {
Expand Down Expand Up @@ -965,8 +1050,8 @@ impl Bucket {
let command = Command::PutObject {
// part_number,
content: &chunk,
content_type: "application/octet-stream",
multipart: Some(Multipart::new(part_number, upload_id)), // upload_id: &msg.upload_id,
content_type,
};
let request = RequestImpl::new(self, &path, command);
let (chunk_data, chunk_code) = request.response_data(true)?;
Expand Down Expand Up @@ -1006,8 +1091,8 @@ impl Bucket {
part_number += 1;
let command = Command::PutObject {
content: &chunk,
content_type: "application/octet-stream",
multipart: Some(Multipart::new(part_number, upload_id)),
content_type,
};
let request = RequestImpl::new(self, &path, command);
let (chunk_data, chunk_code) = request.response_data(true)?;
Expand Down
7 changes: 5 additions & 2 deletions s3/src/command.rs
Expand Up @@ -105,7 +105,9 @@ pub enum Command<'a> {
PresignDelete {
expiry_secs: u32,
},
InitiateMultipartUpload,
InitiateMultipartUpload {
content_type: &'a str,
},
UploadPart {
part_number: u32,
content: &'a [u8],
Expand Down Expand Up @@ -147,7 +149,7 @@ impl<'a> Command<'a> {
| Command::AbortMultipartUpload { .. }
| Command::PresignDelete { .. }
| Command::DeleteBucket => HttpMethod::Delete,
Command::InitiateMultipartUpload | Command::CompleteMultipartUpload { .. } => {
Command::InitiateMultipartUpload { .. } | Command::CompleteMultipartUpload { .. } => {
HttpMethod::Post
}
Command::HeadObject => HttpMethod::Head,
Expand All @@ -174,6 +176,7 @@ impl<'a> Command<'a> {

pub fn content_type(&self) -> String {
match self {
Command::InitiateMultipartUpload { content_type } => content_type.to_string(),
Command::PutObject { content_type, .. } => content_type.to_string(),
Command::CompleteMultipartUpload { .. } => "application/xml".into(),
_ => "text/plain".into(),
Expand Down
2 changes: 1 addition & 1 deletion s3/src/request_trait.rs
Expand Up @@ -200,7 +200,7 @@ pub trait Request {
// Append to url_path
#[allow(clippy::collapsible_match)]
match self.command() {
Command::InitiateMultipartUpload | Command::ListMultipartUploads { .. } => {
Command::InitiateMultipartUpload { .. } | Command::ListMultipartUploads { .. } => {
url_str.push_str("?uploads")
}
Command::AbortMultipartUpload { upload_id } => {
Expand Down

0 comments on commit f453fc2

Please sign in to comment.