Skip to content

Commit

Permalink
Replace AsyncSeek trait by AsyncSeekForward for Reader to address
Browse files Browse the repository at this point in the history
bevyengine#12880 (bevyengine#14194)

# Objective

The primary motivation behind this PR is to (partially?) address the
limitations imposed by the recently added `AsyncSeek` trait bound
discussed in issue bevyengine#12880. While the `AsyncSeek` trait add some
flexibility to the reader, it inadvertently restricts the ability to
write asset readers that can truly stream bytes, particularly in
scenarios like HTTP requests where backward seeking is not supported. It
is also challenging in contexts where assets are stored in compressed
formats or require other kinds of transformations.

The logic behind this change is that currently, with `AsyncSeek`, an
asset Reader based on streamed data will either 1) fail silently, 2)
return an error, or 3) use a buffer to satisfy the trait constraint. I
believe that being able to advance in the file without having to "read"
it is a good thing. The only issue here is the ability to seek backward.
It is highly likely that in this context, we only need to seek forward
in the file because we would have already read an entry table upstream
and just want to access one or more resources further in the file. I
understand that in some cases, this may not be applicable, but I think
it is more beneficial not to constrain `Reader`s that want to stream
than to allow "Assets" to read files in a completely arbitrary order.

## Solution

Replace the current `AsyncSeek` trait with `AsyncSeekForward` on asset
`Reader`

## Changelog

- Introduced a new custom trait, `AsyncSeekForward`, for the asset
Reader.
- Replaced the current `AsyncSeek` trait with `AsyncSeekForward` for all
asset `Reader` implementations.

## Migration Guide

Replace all instances of `AsyncSeek` with `AsyncSeekForward` in your
asset reader implementations.
  • Loading branch information
ldubos authored Oct 1, 2024
1 parent de888a3 commit f08f077
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 96 deletions.
25 changes: 23 additions & 2 deletions crates/bevy_asset/src/io/file/file_asset.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,35 @@
use crate::io::{
get_meta_path, AssetReader, AssetReaderError, AssetWriter, AssetWriterError, PathStream,
Reader, Writer,
get_meta_path, AssetReader, AssetReaderError, AssetWriter, AssetWriterError, AsyncSeekForward,
PathStream, Reader, Writer,
};
use async_fs::{read_dir, File};
use futures_io::AsyncSeek;
use futures_lite::StreamExt;

use core::{pin::Pin, task, task::Poll};
use std::path::Path;

use super::{FileAssetReader, FileAssetWriter};

impl AsyncSeekForward for File {
fn poll_seek_forward(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
offset: u64,
) -> Poll<futures_io::Result<u64>> {
let offset: Result<i64, _> = offset.try_into();

if let Ok(offset) = offset {
Pin::new(&mut self).poll_seek(cx, futures_io::SeekFrom::Current(offset))
} else {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"seek position is out of range",
)))
}
}
}

impl Reader for File {}

impl AssetReader for FileAssetReader {
Expand Down
16 changes: 9 additions & 7 deletions crates/bevy_asset/src/io/file/sync_file_asset.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use futures_io::{AsyncRead, AsyncSeek, AsyncWrite};
use futures_io::{AsyncRead, AsyncWrite};
use futures_lite::Stream;

use crate::io::{
get_meta_path, AssetReader, AssetReaderError, AssetWriter, AssetWriterError, PathStream,
Reader, Writer,
get_meta_path, AssetReader, AssetReaderError, AssetWriter, AssetWriterError, AsyncSeekForward,
PathStream, Reader, Writer,
};

use core::{pin::Pin, task::Poll};
Expand All @@ -29,14 +29,16 @@ impl AsyncRead for FileReader {
}
}

impl AsyncSeek for FileReader {
fn poll_seek(
impl AsyncSeekForward for FileReader {
fn poll_seek_forward(
self: Pin<&mut Self>,
_cx: &mut core::task::Context<'_>,
pos: std::io::SeekFrom,
offset: u64,
) -> Poll<std::io::Result<u64>> {
let this = self.get_mut();
let seek = this.0.seek(pos);
let current = this.0.stream_position()?;
let seek = this.0.seek(std::io::SeekFrom::Start(current + offset));

Poll::Ready(seek)
}
}
Expand Down
44 changes: 13 additions & 31 deletions crates/bevy_asset/src/io/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ use crate::io::{AssetReader, AssetReaderError, PathStream, Reader};
use alloc::sync::Arc;
use bevy_utils::HashMap;
use core::{pin::Pin, task::Poll};
use futures_io::{AsyncRead, AsyncSeek};
use futures_io::AsyncRead;
use futures_lite::{ready, Stream};
use parking_lot::RwLock;
use std::{
io::SeekFrom,
path::{Path, PathBuf},
};
use std::path::{Path, PathBuf};

use super::AsyncSeekForward;

#[derive(Default, Debug)]
struct DirInternal {
Expand Down Expand Up @@ -247,37 +246,20 @@ impl AsyncRead for DataReader {
}
}

impl AsyncSeek for DataReader {
fn poll_seek(
impl AsyncSeekForward for DataReader {
fn poll_seek_forward(
mut self: Pin<&mut Self>,
_cx: &mut core::task::Context<'_>,
pos: SeekFrom,
offset: u64,
) -> Poll<std::io::Result<u64>> {
let result = match pos {
SeekFrom::Start(offset) => offset.try_into(),
SeekFrom::End(offset) => self
.data
.value()
.len()
.try_into()
.map(|len: i64| len - offset),
SeekFrom::Current(offset) => self
.bytes_read
.try_into()
.map(|bytes_read: i64| bytes_read + offset),
};
let result = self
.bytes_read
.try_into()
.map(|bytes_read: u64| bytes_read + offset);

if let Ok(new_pos) = result {
if new_pos < 0 {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"seek position is out of range",
)))
} else {
self.bytes_read = new_pos as _;

Poll::Ready(Ok(new_pos as _))
}
self.bytes_read = new_pos as _;
Poll::Ready(Ok(new_pos as _))
} else {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
Expand Down
108 changes: 60 additions & 48 deletions crates/bevy_asset/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,9 @@ use core::{
pin::Pin,
task::{Context, Poll},
};
use futures_io::{AsyncRead, AsyncSeek, AsyncWrite};
use futures_io::{AsyncRead, AsyncWrite};
use futures_lite::{ready, Stream};
use std::{
io::SeekFrom,
path::{Path, PathBuf},
};
use std::path::{Path, PathBuf};
use thiserror::Error;

/// Errors that occur while loading assets.
Expand Down Expand Up @@ -83,13 +80,51 @@ pub const STACK_FUTURE_SIZE: usize = 10 * size_of::<&()>();

pub use stackfuture::StackFuture;

/// Asynchronously advances the cursor position by a specified number of bytes.
///
/// This trait is a simplified version of the [`futures_io::AsyncSeek`] trait, providing
/// support exclusively for the [`futures_io::SeekFrom::Current`] variant. It allows for relative
/// seeking from the current cursor position.
pub trait AsyncSeekForward {
/// Attempts to asynchronously seek forward by a specified number of bytes from the current cursor position.
///
/// Seeking beyond the end of the stream is allowed and the behavior for this case is defined by the implementation.
/// The new position, relative to the beginning of the stream, should be returned upon successful completion
/// of the seek operation.
///
/// If the seek operation completes successfully,
/// the new position relative to the beginning of the stream should be returned.
///
/// # Implementation
///
/// Implementations of this trait should handle [`Poll::Pending`] correctly, converting
/// [`std::io::ErrorKind::WouldBlock`] errors into [`Poll::Pending`] to indicate that the operation is not
/// yet complete and should be retried, and either internally retry or convert
/// [`std::io::ErrorKind::Interrupted`] into another error kind.
fn poll_seek_forward(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
offset: u64,
) -> Poll<futures_io::Result<u64>>;
}

impl<T: ?Sized + AsyncSeekForward + Unpin> AsyncSeekForward for Box<T> {
fn poll_seek_forward(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
offset: u64,
) -> Poll<futures_io::Result<u64>> {
Pin::new(&mut **self).poll_seek_forward(cx, offset)
}
}

/// A type returned from [`AssetReader::read`], which is used to read the contents of a file
/// (or virtual file) corresponding to an asset.
///
/// This is essentially a trait alias for types implementing [`AsyncRead`] and [`AsyncSeek`].
/// This is essentially a trait alias for types implementing [`AsyncRead`] and [`AsyncSeekForward`].
/// The only reason a blanket implementation is not provided for applicable types is to allow
/// implementors to override the provided implementation of [`Reader::read_to_end`].
pub trait Reader: AsyncRead + AsyncSeek + Unpin + Send + Sync {
pub trait Reader: AsyncRead + AsyncSeekForward + Unpin + Send + Sync {
/// Reads the entire contents of this reader and appends them to a vec.
///
/// # Note for implementors
Expand Down Expand Up @@ -559,32 +594,20 @@ impl AsyncRead for VecReader {
}
}

impl AsyncSeek for VecReader {
fn poll_seek(
impl AsyncSeekForward for VecReader {
fn poll_seek_forward(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
pos: SeekFrom,
offset: u64,
) -> Poll<std::io::Result<u64>> {
let result = match pos {
SeekFrom::Start(offset) => offset.try_into(),
SeekFrom::End(offset) => self.bytes.len().try_into().map(|len: i64| len - offset),
SeekFrom::Current(offset) => self
.bytes_read
.try_into()
.map(|bytes_read: i64| bytes_read + offset),
};
let result = self
.bytes_read
.try_into()
.map(|bytes_read: u64| bytes_read + offset);

if let Ok(new_pos) = result {
if new_pos < 0 {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"seek position is out of range",
)))
} else {
self.bytes_read = new_pos as _;

Poll::Ready(Ok(new_pos as _))
}
self.bytes_read = new_pos as _;
Poll::Ready(Ok(new_pos as _))
} else {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
Expand Down Expand Up @@ -644,32 +667,21 @@ impl<'a> AsyncRead for SliceReader<'a> {
}
}

impl<'a> AsyncSeek for SliceReader<'a> {
fn poll_seek(
impl<'a> AsyncSeekForward for SliceReader<'a> {
fn poll_seek_forward(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
pos: SeekFrom,
offset: u64,
) -> Poll<std::io::Result<u64>> {
let result = match pos {
SeekFrom::Start(offset) => offset.try_into(),
SeekFrom::End(offset) => self.bytes.len().try_into().map(|len: i64| len - offset),
SeekFrom::Current(offset) => self
.bytes_read
.try_into()
.map(|bytes_read: i64| bytes_read + offset),
};
let result = self
.bytes_read
.try_into()
.map(|bytes_read: u64| bytes_read + offset);

if let Ok(new_pos) = result {
if new_pos < 0 {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"seek position is out of range",
)))
} else {
self.bytes_read = new_pos as _;
self.bytes_read = new_pos as _;

Poll::Ready(Ok(new_pos as _))
}
Poll::Ready(Ok(new_pos as _))
} else {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
Expand Down
16 changes: 8 additions & 8 deletions crates/bevy_asset/src/io/processor_gated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ use alloc::sync::Arc;
use async_lock::RwLockReadGuardArc;
use bevy_utils::tracing::trace;
use core::{pin::Pin, task::Poll};
use futures_io::{AsyncRead, AsyncSeek};
use std::{io::SeekFrom, path::Path};
use futures_io::AsyncRead;
use std::path::Path;

use super::ErasedAssetReader;
use super::{AsyncSeekForward, ErasedAssetReader};

/// An [`AssetReader`] that will prevent asset (and asset metadata) read futures from returning for a
/// given path until that path has been processed by [`AssetProcessor`].
///
/// [`AssetProcessor`]: crate::processor::AssetProcessor
/// [`AssetProcessor`]: crate::processor::AssetProcessor
pub struct ProcessorGatedReader {
reader: Box<dyn ErasedAssetReader>,
source: AssetSourceId<'static>,
Expand Down Expand Up @@ -142,13 +142,13 @@ impl AsyncRead for TransactionLockedReader<'_> {
}
}

impl AsyncSeek for TransactionLockedReader<'_> {
fn poll_seek(
impl AsyncSeekForward for TransactionLockedReader<'_> {
fn poll_seek_forward(
mut self: Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
pos: SeekFrom,
offset: u64,
) -> Poll<std::io::Result<u64>> {
Pin::new(&mut self.reader).poll_seek(cx, pos)
Pin::new(&mut self.reader).poll_seek_forward(cx, offset)
}
}

Expand Down

0 comments on commit f08f077

Please sign in to comment.