Skip to content

Commit

Permalink
fs: Spawn new write task while buf is not empty
Browse files Browse the repository at this point in the history
Fixes: tokio-rs#6325

Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo committed Feb 7, 2024
1 parent 10c9eeb commit bc27f6c
Showing 1 changed file with 66 additions and 55 deletions.
121 changes: 66 additions & 55 deletions tokio/src/fs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ struct Inner {

#[derive(Debug)]
enum State {
Idle(Option<Buf>),
Busy(JoinHandle<(Operation, Buf)>),
Idle(Option<(Arc<StdFile>, Buf)>),
Busy(JoinHandle<(Arc<StdFile>, Operation, Buf)>),
}

#[derive(Debug)]
Expand Down Expand Up @@ -234,10 +234,12 @@ impl File {
/// let file = tokio::fs::File::from_std(std_file);
/// ```
pub fn from_std(std: StdFile) -> File {
let std = Arc::new(std);

File {
std: Arc::new(std),
std: std.clone(),
inner: Mutex::new(Inner {
state: State::Idle(Some(Buf::with_capacity(0))),
state: State::Idle(Some((std, Buf::with_capacity(0)))),
last_write_err: None,
pos: 0,
}),
Expand Down Expand Up @@ -344,8 +346,8 @@ impl File {
let mut inner = self.inner.lock().await;
inner.complete_inflight().await;

let mut buf = match inner.state {
State::Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
let (std, mut buf) = match inner.state {
State::Idle(ref mut idle_state) => idle_state.take().unwrap(),
_ => unreachable!(),
};

Expand All @@ -355,8 +357,6 @@ impl File {
None
};

let std = self.std.clone();

inner.state = State::Busy(spawn_blocking(move || {
let res = if let Some(seek) = seek {
(&*std).seek(seek).and_then(|_| std.set_len(size))
Expand All @@ -366,15 +366,15 @@ impl File {
.map(|()| 0); // the value is discarded later

// Return the result as a seek
(Operation::Seek(res), buf)
(std, Operation::Seek(res), buf)
}));

let (op, buf) = match inner.state {
let (std, op, buf) = match inner.state {
State::Idle(_) => unreachable!(),
State::Busy(ref mut rx) => rx.await?,
};

inner.state = State::Idle(Some(buf));
inner.state = State::Idle(Some((std, buf)));

match op {
Operation::Seek(res) => res.map(|pos| {
Expand Down Expand Up @@ -522,51 +522,49 @@ impl AsyncRead for File {

loop {
match inner.state {
State::Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();
State::Idle(ref mut idle_state) => {
let (std, mut buf) = idle_state.take().unwrap();

if !buf.is_empty() {
buf.copy_to(dst);
*buf_cell = Some(buf);
*idle_state = Some((std, buf));
return Poll::Ready(Ok(()));
}

buf.ensure_capacity_for(dst);
let std = me.std.clone();

inner.state = State::Busy(spawn_blocking(move || {
let res = buf.read_from(&mut &*std);
(Operation::Read(res), buf)
(std, Operation::Read(res), buf)
}));
}
State::Busy(ref mut rx) => {
let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?;
let (std, op, mut buf) = ready!(Pin::new(rx).poll(cx))?;

match op {
Operation::Read(Ok(_)) => {
buf.copy_to(dst);
inner.state = State::Idle(Some(buf));
inner.state = State::Idle(Some((std, buf)));
return Poll::Ready(Ok(()));
}
Operation::Read(Err(e)) => {
assert!(buf.is_empty());

inner.state = State::Idle(Some(buf));
inner.state = State::Idle(Some((std, buf)));
return Poll::Ready(Err(e));
}
Operation::Write(Ok(())) => {
assert!(buf.is_empty());
inner.state = State::Idle(Some(buf));
inner.state = State::Idle(Some((std, buf)));
continue;
}
Operation::Write(Err(e)) => {
assert!(inner.last_write_err.is_none());
inner.last_write_err = Some(e.kind());
inner.state = State::Idle(Some(buf));
inner.state = State::Idle(Some((std, buf)));
}
Operation::Seek(result) => {
assert!(buf.is_empty());
inner.state = State::Idle(Some(buf));
inner.state = State::Idle(Some((std, buf)));
if let Ok(pos) = result {
inner.pos = pos;
}
Expand All @@ -589,8 +587,8 @@ impl AsyncSeek for File {
io::ErrorKind::Other,
"other file operation is pending, call poll_complete before start_seek",
)),
State::Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();
State::Idle(ref mut idle_state) => {
let (std, mut buf) = idle_state.take().unwrap();

// Factor in any unread data from the buf
if !buf.is_empty() {
Expand All @@ -601,11 +599,9 @@ impl AsyncSeek for File {
}
}

let std = me.std.clone();

inner.state = State::Busy(spawn_blocking(move || {
let res = (&*std).seek(pos);
(Operation::Seek(res), buf)
(std, Operation::Seek(res), buf)
}));
Ok(())
}
Expand All @@ -620,8 +616,8 @@ impl AsyncSeek for File {
match inner.state {
State::Idle(_) => return Poll::Ready(Ok(inner.pos)),
State::Busy(ref mut rx) => {
let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
inner.state = State::Idle(Some(buf));
let (std, op, buf) = ready!(Pin::new(rx).poll(cx))?;
inner.state = State::Idle(Some((std, buf)));

match op {
Operation::Read(_) => {}
Expand Down Expand Up @@ -659,8 +655,8 @@ impl AsyncWrite for File {

loop {
match inner.state {
State::Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();
State::Idle(ref mut idle_state) => {
let (std, mut buf) = idle_state.take().unwrap();

let seek = if !buf.is_empty() {
Some(SeekFrom::Current(buf.discard_read()))
Expand All @@ -669,16 +665,14 @@ impl AsyncWrite for File {
};

let n = buf.copy_from(src);
let std = me.std.clone();

let blocking_task_join_handle = spawn_mandatory_blocking(move || {
let res = if let Some(seek) = seek {
(&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
} else {
buf.write_to(&mut &*std)
};

(Operation::Write(res), buf)
(std, Operation::Write(res), buf)
})
.ok_or_else(|| {
io::Error::new(io::ErrorKind::Other, "background task failed")
Expand All @@ -689,8 +683,8 @@ impl AsyncWrite for File {
return Poll::Ready(Ok(n));
}
State::Busy(ref mut rx) => {
let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
inner.state = State::Idle(Some(buf));
let (std, op, buf) = ready!(Pin::new(rx).poll(cx))?;
inner.state = State::Idle(Some((std, buf)));

match op {
Operation::Read(_) => {
Expand Down Expand Up @@ -730,8 +724,8 @@ impl AsyncWrite for File {

loop {
match inner.state {
State::Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();
State::Idle(ref mut idle_state) => {
let (std, mut buf) = idle_state.take().unwrap();

let seek = if !buf.is_empty() {
Some(SeekFrom::Current(buf.discard_read()))
Expand All @@ -740,16 +734,14 @@ impl AsyncWrite for File {
};

let n = buf.copy_from_bufs(bufs);
let std = me.std.clone();

let blocking_task_join_handle = spawn_mandatory_blocking(move || {
let res = if let Some(seek) = seek {
(&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
} else {
buf.write_to(&mut &*std)
};

(Operation::Write(res), buf)
(std, Operation::Write(res), buf)
})
.ok_or_else(|| {
io::Error::new(io::ErrorKind::Other, "background task failed")
Expand All @@ -760,8 +752,8 @@ impl AsyncWrite for File {
return Poll::Ready(Ok(n));
}
State::Busy(ref mut rx) => {
let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
inner.state = State::Idle(Some(buf));
let (std, op, buf) = ready!(Pin::new(rx).poll(cx))?;
inner.state = State::Idle(Some((std, buf)));

match op {
Operation::Read(_) => {
Expand Down Expand Up @@ -889,18 +881,37 @@ impl Inner {
return Poll::Ready(Err(e.into()));
}

let (op, buf) = match self.state {
State::Idle(_) => return Poll::Ready(Ok(())),
State::Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?,
};

// The buffer is not used here
self.state = State::Idle(Some(buf));
loop {
match &mut self.state {
State::Idle(idle_state) => {
let (std, mut buf) = idle_state.take().unwrap();
// return ok directly if buf is empty.
if buf.is_empty() {
*idle_state = Some((std, buf));
return Poll::Ready(Ok(()));
}

match op {
Operation::Read(_) => Poll::Ready(Ok(())),
Operation::Write(res) => Poll::Ready(res),
Operation::Seek(_) => Poll::Ready(Ok(())),
// spawn a new job to flush the buffer.
let blocking_task_join_handle = spawn_mandatory_blocking(move || {
let res = buf.write_to(&mut &*std);
(std, Operation::Write(res), buf)
})
.ok_or_else(|| {
io::Error::new(io::ErrorKind::Other, "background task failed")
})?;
self.state = State::Busy(blocking_task_join_handle);
}
State::Busy(rx) => {
let (std, op, buf) = ready!(Pin::new(rx).poll(cx))?;
self.state = State::Idle(Some((std, buf)));

return match op {
Operation::Read(_) => Poll::Ready(Ok(())),
Operation::Write(res) => Poll::Ready(res),
Operation::Seek(_) => Poll::Ready(Ok(())),
};
}
};
}
}
}
Expand Down

0 comments on commit bc27f6c

Please sign in to comment.