Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fs: Spawn new write task while buf is not empty #6330

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
120 changes: 66 additions & 54 deletions tokio/src/fs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ struct Inner {

#[derive(Debug)]
enum State {
Idle(Option<Buf>),
Busy(JoinHandle<(Operation, Buf)>),
Idle(Option<(Arc<StdFile>, Buf)>),
Busy(JoinHandle<(Arc<StdFile>, Operation, Buf)>),
}

#[derive(Debug)]
Expand Down Expand Up @@ -234,10 +234,12 @@ impl File {
/// let file = tokio::fs::File::from_std(std_file);
/// ```
pub fn from_std(std: StdFile) -> File {
let std = Arc::new(std);

File {
std: Arc::new(std),
std: std.clone(),
inner: Mutex::new(Inner {
state: State::Idle(Some(Buf::with_capacity(0))),
state: State::Idle(Some((std, Buf::with_capacity(0)))),
last_write_err: None,
pos: 0,
}),
Expand Down Expand Up @@ -344,8 +346,8 @@ impl File {
let mut inner = self.inner.lock().await;
inner.complete_inflight().await;

let mut buf = match inner.state {
State::Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
let (std, mut buf) = match inner.state {
State::Idle(ref mut idle_state) => idle_state.take().unwrap(),
_ => unreachable!(),
};

Expand All @@ -355,8 +357,6 @@ impl File {
None
};

let std = self.std.clone();

inner.state = State::Busy(spawn_blocking(move || {
let res = if let Some(seek) = seek {
(&*std).seek(seek).and_then(|_| std.set_len(size))
Expand All @@ -366,15 +366,15 @@ impl File {
.map(|()| 0); // the value is discarded later

// Return the result as a seek
(Operation::Seek(res), buf)
(std, Operation::Seek(res), buf)
}));

let (op, buf) = match inner.state {
let (std, op, buf) = match inner.state {
State::Idle(_) => unreachable!(),
State::Busy(ref mut rx) => rx.await?,
};

inner.state = State::Idle(Some(buf));
inner.state = State::Idle(Some((std, buf)));

match op {
Operation::Seek(res) => res.map(|pos| {
Expand Down Expand Up @@ -522,51 +522,49 @@ impl AsyncRead for File {

loop {
match inner.state {
State::Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();
State::Idle(ref mut idle_state) => {
let (std, mut buf) = idle_state.take().unwrap();

if !buf.is_empty() {
buf.copy_to(dst);
*buf_cell = Some(buf);
*idle_state = Some((std, buf));
return Poll::Ready(Ok(()));
}

buf.ensure_capacity_for(dst);
let std = me.std.clone();

inner.state = State::Busy(spawn_blocking(move || {
let res = buf.read_from(&mut &*std);
(Operation::Read(res), buf)
(std, Operation::Read(res), buf)
}));
}
State::Busy(ref mut rx) => {
let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?;
let (std, op, mut buf) = ready!(Pin::new(rx).poll(cx))?;

match op {
Operation::Read(Ok(_)) => {
buf.copy_to(dst);
inner.state = State::Idle(Some(buf));
inner.state = State::Idle(Some((std, buf)));
return Poll::Ready(Ok(()));
}
Operation::Read(Err(e)) => {
assert!(buf.is_empty());

inner.state = State::Idle(Some(buf));
inner.state = State::Idle(Some((std, buf)));
return Poll::Ready(Err(e));
}
Operation::Write(Ok(())) => {
assert!(buf.is_empty());
inner.state = State::Idle(Some(buf));
inner.state = State::Idle(Some((std, buf)));
continue;
}
Operation::Write(Err(e)) => {
assert!(inner.last_write_err.is_none());
inner.last_write_err = Some(e.kind());
inner.state = State::Idle(Some(buf));
inner.state = State::Idle(Some((std, buf)));
}
Operation::Seek(result) => {
assert!(buf.is_empty());
inner.state = State::Idle(Some(buf));
inner.state = State::Idle(Some((std, buf)));
if let Ok(pos) = result {
inner.pos = pos;
}
Expand All @@ -589,8 +587,8 @@ impl AsyncSeek for File {
io::ErrorKind::Other,
"other file operation is pending, call poll_complete before start_seek",
)),
State::Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();
State::Idle(ref mut idle_state) => {
let (std, mut buf) = idle_state.take().unwrap();

// Factor in any unread data from the buf
if !buf.is_empty() {
Expand All @@ -601,11 +599,9 @@ impl AsyncSeek for File {
}
}

let std = me.std.clone();

inner.state = State::Busy(spawn_blocking(move || {
let res = (&*std).seek(pos);
(Operation::Seek(res), buf)
(std, Operation::Seek(res), buf)
}));
Ok(())
}
Expand All @@ -620,8 +616,8 @@ impl AsyncSeek for File {
match inner.state {
State::Idle(_) => return Poll::Ready(Ok(inner.pos)),
State::Busy(ref mut rx) => {
let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
inner.state = State::Idle(Some(buf));
let (std, op, buf) = ready!(Pin::new(rx).poll(cx))?;
inner.state = State::Idle(Some((std, buf)));

match op {
Operation::Read(_) => {}
Expand Down Expand Up @@ -659,8 +655,8 @@ impl AsyncWrite for File {

loop {
match inner.state {
State::Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();
State::Idle(ref mut idle_state) => {
let (std, mut buf) = idle_state.take().unwrap();

let seek = if !buf.is_empty() {
Some(SeekFrom::Current(buf.discard_read()))
Expand All @@ -669,16 +665,14 @@ impl AsyncWrite for File {
};

let n = buf.copy_from(src);
let std = me.std.clone();

let blocking_task_join_handle = spawn_mandatory_blocking(move || {
let res = if let Some(seek) = seek {
(&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
} else {
buf.write_to(&mut &*std)
};

(Operation::Write(res), buf)
(std, Operation::Write(res), buf)
})
.ok_or_else(|| {
io::Error::new(io::ErrorKind::Other, "background task failed")
Expand All @@ -689,8 +683,8 @@ impl AsyncWrite for File {
return Poll::Ready(Ok(n));
}
State::Busy(ref mut rx) => {
let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
inner.state = State::Idle(Some(buf));
let (std, op, buf) = ready!(Pin::new(rx).poll(cx))?;
inner.state = State::Idle(Some((std, buf)));

match op {
Operation::Read(_) => {
Expand Down Expand Up @@ -730,8 +724,8 @@ impl AsyncWrite for File {

loop {
match inner.state {
State::Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();
State::Idle(ref mut idle_state) => {
let (std, mut buf) = idle_state.take().unwrap();

let seek = if !buf.is_empty() {
Some(SeekFrom::Current(buf.discard_read()))
Expand All @@ -740,16 +734,14 @@ impl AsyncWrite for File {
};

let n = buf.copy_from_bufs(bufs);
let std = me.std.clone();

let blocking_task_join_handle = spawn_mandatory_blocking(move || {
let res = if let Some(seek) = seek {
(&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
} else {
buf.write_to(&mut &*std)
};

(Operation::Write(res), buf)
(std, Operation::Write(res), buf)
})
.ok_or_else(|| {
io::Error::new(io::ErrorKind::Other, "background task failed")
Expand All @@ -760,8 +752,8 @@ impl AsyncWrite for File {
return Poll::Ready(Ok(n));
}
State::Busy(ref mut rx) => {
let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
inner.state = State::Idle(Some(buf));
let (std, op, buf) = ready!(Pin::new(rx).poll(cx))?;
inner.state = State::Idle(Some((std, buf)));

match op {
Operation::Read(_) => {
Expand Down Expand Up @@ -889,18 +881,38 @@ impl Inner {
return Poll::Ready(Err(e.into()));
}

let (op, buf) = match self.state {
State::Idle(_) => return Poll::Ready(Ok(())),
State::Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?,
};
loop {
match &mut self.state {
State::Idle(idle_state) => {
let (std, mut buf) = idle_state.take().unwrap();
// return ok directly if buf is empty.
if buf.is_empty() {
*idle_state = Some((std, buf));
return Poll::Ready(Ok(()));
}

// The buffer is not used here
self.state = State::Idle(Some(buf));
// spawn a new job to flush the buffer.
let blocking_task_join_handle = spawn_mandatory_blocking(move || {
let res = buf.write_to_partial(&mut &*std);

match op {
Operation::Read(_) => Poll::Ready(Ok(())),
Operation::Write(res) => Poll::Ready(res),
Operation::Seek(_) => Poll::Ready(Ok(())),
(std, Operation::Write(res), buf)
})
.ok_or_else(|| {
io::Error::new(io::ErrorKind::Other, "background task failed")
})?;
self.state = State::Busy(blocking_task_join_handle);
}
State::Busy(rx) => {
let (std, op, buf) = ready!(Pin::new(rx).poll(cx))?;
self.state = State::Idle(Some((std, buf)));

return match op {
Operation::Read(_) => Poll::Ready(Ok(())),
Operation::Write(res) => Poll::Ready(res),
Operation::Seek(_) => Poll::Ready(Ok(())),
};
}
};
}
}
}
Expand Down
16 changes: 16 additions & 0 deletions tokio/src/io/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,22 @@ impl Buf {
self.buf.clear();
res
}

pub(crate) fn write_to_partial<T: Write>(&mut self, wr: &mut T) -> io::Result<()> {
if self.is_empty() {
return Ok(());
}

let res = uninterruptibly!(wr.write(self.bytes()));

match res {
Ok(n) => {
self.pos += n;
Ok(())
}
Err(err) => Err(err),
}
}
}

cfg_fs! {
Expand Down