Merge "Add trait for non-blocking Block IO" into main
diff --git a/gbl/efi/src/utils.rs b/gbl/efi/src/utils.rs
index 386dfcc..f3652a0 100644
--- a/gbl/efi/src/utils.rs
+++ b/gbl/efi/src/utils.rs
@@ -28,7 +28,9 @@
     DeviceHandle, EfiEntry, EventType,
 };
 use fdt::FdtHeader;
-use gbl_storage::{required_scratch_size, AsBlockDevice, AsMultiBlockDevices, BlockIo};
+use gbl_storage::{
+    required_scratch_size, AsBlockDevice, AsMultiBlockDevices, BlockInfo, BlockIo, BlockIoError,
+};
 
 pub const EFI_DTB_TABLE_GUID: EfiGuid =
     EfiGuid::new(0xb1b621d5, 0xf19c, 0x41a5, [0x83, 0x0b, 0xd9, 0x15, 0x2c, 0x69, 0xaa, 0xe0]);
@@ -62,24 +64,32 @@
 pub struct EfiBlockIo<'a>(pub Protocol<'a, BlockIoProtocol>);
 
 impl BlockIo for EfiBlockIo<'_> {
-    fn block_size(&mut self) -> u64 {
-        self.0.media().unwrap().block_size as u64
+    fn info(&mut self) -> BlockInfo {
+        BlockInfo {
+            block_size: self.0.media().unwrap().block_size as u64,
+            num_blocks: (self.0.media().unwrap().last_block + 1) as u64,
+            alignment: core::cmp::max(1, self.0.media().unwrap().io_align as u64),
+        }
     }
 
-    fn num_blocks(&mut self) -> u64 {
-        (self.0.media().unwrap().last_block + 1) as u64
+    fn read_blocks(
+        &mut self,
+        blk_offset: u64,
+        out: &mut [u8],
+    ) -> core::result::Result<(), BlockIoError> {
+        self.0
+            .read_blocks(blk_offset, out)
+            .map_err(|_| BlockIoError::Others(Some("EFI BLOCK_IO protocol read error")))
     }
 
-    fn alignment(&mut self) -> u64 {
-        core::cmp::max(1, self.0.media().unwrap().io_align as u64)
-    }
-
-    fn read_blocks(&mut self, blk_offset: u64, out: &mut [u8]) -> bool {
-        self.0.read_blocks(blk_offset, out).is_ok()
-    }
-
-    fn write_blocks(&mut self, blk_offset: u64, data: &[u8]) -> bool {
-        self.0.write_blocks(blk_offset, data).is_ok()
+    fn write_blocks(
+        &mut self,
+        blk_offset: u64,
+        data: &mut [u8],
+    ) -> core::result::Result<(), BlockIoError> {
+        self.0
+            .write_blocks(blk_offset, data)
+            .map_err(|_| BlockIoError::Others(Some("EFI BLOCK_IO protocol write error")))
     }
 }
 
diff --git a/gbl/libefi/defs/protocols/block_io_protocol.h b/gbl/libefi/defs/protocols/block_io_protocol.h
index 48a975d..dbbd94d 100644
--- a/gbl/libefi/defs/protocols/block_io_protocol.h
+++ b/gbl/libefi/defs/protocols/block_io_protocol.h
@@ -30,7 +30,7 @@
   EfiStatus (*read_blocks)(EfiBlockIoProtocol* self, uint32_t media_id, uint64_t lba,
                            size_t buffer_size, void* buffer);
   EfiStatus (*write_blocks)(EfiBlockIoProtocol* self, uint32_t media_id, uint64_t lba,
-                            size_t buffer_size, const void* buffer);
+                            size_t buffer_size, void* buffer);
   EfiStatus (*flush_blocks)(EfiBlockIoProtocol* self);
 };
 
diff --git a/gbl/libefi/src/protocol/block_io.rs b/gbl/libefi/src/protocol/block_io.rs
index 2aea44f..e646c26 100644
--- a/gbl/libefi/src/protocol/block_io.rs
+++ b/gbl/libefi/src/protocol/block_io.rs
@@ -51,7 +51,7 @@
     }
 
     /// Wrapper of `EFI_BLOCK_IO_PROTOCOL.write_blocks()`
-    pub fn write_blocks(&self, lba: u64, buffer: &[u8]) -> EfiResult<()> {
+    pub fn write_blocks(&self, lba: u64, buffer: &mut [u8]) -> EfiResult<()> {
         // SAFETY:
         // `self.interface()?` guarantees self.interface is non-null and points to a valid object
         // established by `Protocol::new()`.
@@ -64,7 +64,7 @@
                 self.media()?.media_id,
                 lba,
                 buffer.len(),
-                buffer.as_ptr() as *const _
+                buffer.as_mut_ptr() as _
             )
         }
     }
diff --git a/gbl/libgbl/src/fastboot/mod.rs b/gbl/libgbl/src/fastboot/mod.rs
index 4e97910..dcdb081 100644
--- a/gbl/libgbl/src/fastboot/mod.rs
+++ b/gbl/libgbl/src/fastboot/mod.rs
@@ -79,7 +79,7 @@
         let mut dev = (&mut self.devs).get(self.part.blk_id)?;
         Ok(match self.part.part() {
             "" => dev.write(offset, data),
-            part => dev.write_gpt_partition_mut(part, offset, data),
+            part => dev.write_gpt_partition(part, offset, data),
         }?)
     }
 
diff --git a/gbl/libgbl/src/slots/partition.rs b/gbl/libgbl/src/slots/partition.rs
index fad6cc7..675b0c9 100644
--- a/gbl/libgbl/src/slots/partition.rs
+++ b/gbl/libgbl/src/slots/partition.rs
@@ -137,7 +137,7 @@
 
         self.data.prepare_for_sync();
 
-        match block_dev.write_gpt_partition_mut(
+        match block_dev.write_gpt_partition(
             self.partition,
             self.partition_offset,
             self.get_mut_data().as_bytes_mut(),
diff --git a/gbl/libstorage/BUILD b/gbl/libstorage/BUILD
index 74f8182..5982a33 100644
--- a/gbl/libstorage/BUILD
+++ b/gbl/libstorage/BUILD
@@ -24,6 +24,7 @@
         "src/gpt.rs",
         "src/lib.rs",
         "src/multi_blocks.rs",
+        "src/non_blocking.rs",
     ],
     crate_name = "gbl_storage",
     edition = "2021",
diff --git a/gbl/libstorage/src/gpt.rs b/gbl/libstorage/src/gpt.rs
index 92a4ba3..2f88d69 100644
--- a/gbl/libstorage/src/gpt.rs
+++ b/gbl/libstorage/src/gpt.rs
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-use crate::{aligned_subslice, read, write_bytes, write_bytes_mut, BlockIo, Result, StorageError};
+use crate::{aligned_subslice, read, write_bytes_mut, BlockIo, Result, StorageError};
 use core::default::Default;
 use core::mem::{align_of, size_of};
 use core::num::NonZeroU64;
@@ -135,6 +135,8 @@
     // and LESS THAN OR EQUAL TO the value of GPT_MAX_NUM_ENTRIES.
     // Values other than GPT_MAX_NUM_ENTRIES are mostly used in unit tests.
     max_entries: u64,
+    // Block size of the GPT disk.
+    block_size: u64,
 }
 
 impl GptInfo {
@@ -176,7 +178,8 @@
             return Err(StorageError::InvalidInput);
         }
         let buffer = aligned_subslice(buffer, GPT_ENTRY_ALIGNMENT)?;
-        *GptInfo::from_bytes(buffer) = GptInfo { num_valid_entries: None, max_entries };
+        *GptInfo::from_bytes(buffer) =
+            GptInfo { num_valid_entries: None, max_entries, block_size: 0 };
         Self::from_existing(buffer)
     }
 
@@ -372,6 +375,7 @@
                 Some(idx) => idx as u64,
                 _ => self.info.max_entries,
             });
+        self.info.block_size = block_size;
         Ok(())
     }
 }
@@ -385,64 +389,24 @@
     gpt.load_and_sync(blk_dev, scratch)
 }
 
-/// Check if a relative offset/len into a partition overflows and returns the absolute offset.
-fn check_offset(
-    blk_dev: &mut (impl BlockIo + ?Sized),
-    entry: &GptEntry,
+/// Checks if a read/write range into a GPT partition overflows and returns the range's absolute
+/// offset in the block device.
+pub(crate) fn check_gpt_rw_params(
+    gpt_cache_buffer: &mut [u8],
+    part_name: &str,
     offset: u64,
-    len: usize,
+    size: usize,
 ) -> Result<u64> {
-    let s = SafeNum::from(offset) + len;
-    let total_size = SafeNum::from(entry.blocks()?) * blk_dev.block_size();
-    match u64::try_from(s)? <= total_size.try_into()? {
-        true => Ok((SafeNum::from(entry.first) * blk_dev.block_size() + offset).try_into()?),
+    let gpt = Gpt::from_existing(gpt_cache_buffer)?;
+    let entry = gpt.find_partition(part_name)?;
+    let end: u64 = (SafeNum::from(offset) + size).try_into()?;
+    let total_size = SafeNum::from(entry.blocks()?) * gpt.info.block_size;
+    match end <= total_size.try_into()? {
+        true => Ok((SafeNum::from(entry.first) * gpt.info.block_size + offset).try_into()?),
         false => Err(StorageError::OutOfRange),
     }
 }
 
-/// Read GPT partition. Library internal helper for AsBlockDevice::read_gpt_partition().
-pub(crate) fn read_gpt_partition(
-    blk_dev: &mut (impl BlockIo + ?Sized),
-    gpt: &Gpt,
-    part_name: &str,
-    offset: u64,
-    out: &mut [u8],
-    scratch: &mut [u8],
-) -> Result<()> {
-    let e = gpt.find_partition(part_name)?;
-    let abs_offset = check_offset(blk_dev, e, offset, out.len())?;
-    read(blk_dev, abs_offset, out, scratch)
-}
-
-/// Write GPT partition. Library internal helper for AsBlockDevice::write_gpt_partition().
-pub(crate) fn write_gpt_partition(
-    blk_dev: &mut (impl BlockIo + ?Sized),
-    gpt: &Gpt,
-    part_name: &str,
-    offset: u64,
-    data: &[u8],
-    scratch: &mut [u8],
-) -> Result<()> {
-    let e = gpt.find_partition(part_name)?;
-    let abs_offset = check_offset(blk_dev, e, offset, data.len())?;
-    write_bytes(blk_dev, abs_offset, data, scratch)
-}
-
-/// Write GPT partition. Library internal helper for AsBlockDevice::write_gpt_partition().
-/// Optimized version for mutable buffers.
-pub(crate) fn write_gpt_partition_mut(
-    blk_dev: &mut (impl BlockIo + ?Sized),
-    gpt: &Gpt,
-    part_name: &str,
-    offset: u64,
-    data: &mut [u8],
-    scratch: &mut [u8],
-) -> Result<()> {
-    let e = gpt.find_partition(part_name)?;
-    let abs_offset = check_offset(blk_dev, e, offset, data.as_ref().len())?;
-    write_bytes_mut(blk_dev, abs_offset, data.as_mut(), scratch)
-}
-
 fn crc32(data: &[u8]) -> u32 {
     let mut hasher = Hasher::new();
     hasher.update(data);
@@ -652,37 +616,21 @@
 
         // "boot_a" partition
         // Mutable version
-        dev.write_gpt_partition_mut("boot_a", 0, expect_boot_a.as_mut_slice()).unwrap();
+        dev.write_gpt_partition("boot_a", 0, expect_boot_a.as_mut_slice()).unwrap();
         dev.read_gpt_partition("boot_a", 0, &mut actual_boot_a).unwrap();
         assert_eq!(expect_boot_a.to_vec(), actual_boot_a);
         // Mutable version, partial write.
-        dev.write_gpt_partition_mut("boot_a", 1, expect_boot_a[1..].as_mut()).unwrap();
-        dev.read_gpt_partition("boot_a", 1, &mut actual_boot_a[1..]).unwrap();
-        assert_eq!(expect_boot_a[1..], actual_boot_a[1..]);
-        // Immutable version
-        dev.write_gpt_partition("boot_a", 0, &expect_boot_a).unwrap();
-        dev.read_gpt_partition("boot_a", 0, &mut actual_boot_a).unwrap();
-        assert_eq!(expect_boot_a.to_vec(), actual_boot_a);
-        // Immutable version, partial write.
-        dev.write_gpt_partition("boot_a", 1, &expect_boot_a[1..]).unwrap();
+        dev.write_gpt_partition("boot_a", 1, expect_boot_a[1..].as_mut()).unwrap();
         dev.read_gpt_partition("boot_a", 1, &mut actual_boot_a[1..]).unwrap();
         assert_eq!(expect_boot_a[1..], actual_boot_a[1..]);
 
         // "boot_b" partition
         // Mutable version
-        dev.write_gpt_partition_mut("boot_b", 0, expect_boot_b.as_mut_slice()).unwrap();
+        dev.write_gpt_partition("boot_b", 0, expect_boot_b.as_mut_slice()).unwrap();
         dev.read_gpt_partition("boot_b", 0, &mut actual_boot_b).unwrap();
         assert_eq!(expect_boot_b.to_vec(), actual_boot_b);
         // Mutable version, partial write.
-        dev.write_gpt_partition_mut("boot_b", 1, expect_boot_b[1..].as_mut()).unwrap();
-        dev.read_gpt_partition("boot_b", 1, &mut actual_boot_b[1..]).unwrap();
-        assert_eq!(expect_boot_b[1..], actual_boot_b[1..]);
-        // Immutable version
-        dev.write_gpt_partition("boot_b", 0, &expect_boot_b).unwrap();
-        dev.read_gpt_partition("boot_b", 0, &mut actual_boot_b).unwrap();
-        assert_eq!(expect_boot_b.to_vec(), actual_boot_b);
-        // Immutable version, partial write.
-        dev.write_gpt_partition("boot_b", 1, &expect_boot_b[1..]).unwrap();
+        dev.write_gpt_partition("boot_b", 1, expect_boot_b[1..].as_mut()).unwrap();
         dev.read_gpt_partition("boot_b", 1, &mut actual_boot_b[1..]).unwrap();
         assert_eq!(expect_boot_b[1..], actual_boot_b[1..]);
     }
@@ -698,11 +646,9 @@
         let mut boot_b = [0u8; include_bytes!("../test/boot_b.bin").len()];
 
         assert!(dev.read_gpt_partition("boot_a", 1, &mut boot_a).is_err());
-        assert!(dev.write_gpt_partition_mut("boot_a", 1, boot_a.as_mut_slice()).is_err());
-        assert!(dev.write_gpt_partition("boot_a", 1, &boot_a).is_err());
+        assert!(dev.write_gpt_partition("boot_a", 1, boot_a.as_mut_slice()).is_err());
 
         assert!(dev.read_gpt_partition("boot_b", 1, &mut boot_b).is_err());
-        assert!(dev.write_gpt_partition_mut("boot_b", 1, boot_b.as_mut_slice()).is_err());
-        assert!(dev.write_gpt_partition("boot_b", 1, &boot_b).is_err());
+        assert!(dev.write_gpt_partition("boot_b", 1, boot_b.as_mut_slice()).is_err());
     }
 }
diff --git a/gbl/libstorage/src/lib.rs b/gbl/libstorage/src/lib.rs
index cfcfaaf..ee56515 100644
--- a/gbl/libstorage/src/lib.rs
+++ b/gbl/libstorage/src/lib.rs
@@ -23,7 +23,7 @@
 //!
 //! ```rust
 //! use gbl_storage::{
-//!     AsBlockDevice, BlockIo, BlockDevice, required_scratch_size,
+//!     AsBlockDevice, BlockIo, BlockDevice, required_scratch_size, BlockInfo, BlockIoError
 //! };
 //!
 //! /// Mocks a block device using a buffer.
@@ -32,30 +32,26 @@
 //! }
 //!
 //! impl BlockIo for RamBlockIo {
-//!     fn block_size(&mut self) -> u64 {
-//!         512
+//!     fn info(&mut self) -> BlockInfo {
+//!         BlockInfo {
+//!             block_size: 512,
+//!             num_blocks: self.storage.len() as u64 / 512,
+//!             alignment: 64,
+//!         }
 //!     }
 //!
-//!     fn num_blocks(&mut self) -> u64 {
-//!         self.storage.len() as u64 / self.block_size()
-//!     }
-//!
-//!     fn alignment(&mut self) -> u64 {
-//!         64
-//!     }
-//!
-//!     fn read_blocks(&mut self, blk_offset: u64, out: &mut [u8]) -> bool {
+//!     fn read_blocks(&mut self, blk_offset: u64, out: &mut [u8]) -> Result<(), BlockIoError> {
 //!         let start = blk_offset * self.block_size();
 //!         let end = start + out.len() as u64;
 //!         out.clone_from_slice(&self.storage[start as usize..end as usize]);
-//!         true
+//!         Ok(())
 //!     }
 //!
-//!     fn write_blocks(&mut self, blk_offset: u64, data: &[u8]) -> bool {
+//!     fn write_blocks(&mut self, blk_offset: u64, data: &mut [u8]) -> Result<(), BlockIoError> {
 //!         let start = blk_offset * self.block_size();
 //!         let end = start + data.len() as u64;
 //!         self.storage[start as usize..end as usize].clone_from_slice(&data);
-//!         true
+//!         Ok(())
 //!     }
 //! }
 //!
@@ -74,9 +70,7 @@
 //! ram_block_dev.read(4321, &mut out[..]).unwrap();
 //! let mut data = vec![0u8; 5678];
 //! // Mutable input. More efficient
-//! ram_block_dev.write_mut(8765, data.as_mut_slice()).unwrap();
-//! // Immutable input. Works too but not as efficient.
-//! ram_block_dev.write(8765, &data).unwrap();
+//! ram_block_dev.write(8765, data.as_mut_slice()).unwrap();
 //!
 //! // Sync GPT
 //! let _ = ram_block_dev.sync_gpt();
@@ -84,7 +78,7 @@
 //! let _ = ram_block_dev.find_partition("some-partition");
 //! // Read/Write GPT partitions with arbitrary offset, size, buffer
 //! let _ = ram_block_dev.read_gpt_partition("partition", 4321, &mut out[..]);
-//! let _ = ram_block_dev.write_gpt_partition_mut("partition", 8765, data.as_mut_slice());
+//! let _ = ram_block_dev.write_gpt_partition("partition", 8765, data.as_mut_slice());
 //!
 //! // Alterantively, you can also define a custom type that internally owns and binds the
 //! // implementation of `BlockIo` and scratch buffer together, and then implement the
@@ -110,6 +104,7 @@
 
 // Selective export of submodule types.
 mod gpt;
+use gpt::check_gpt_rw_params;
 use gpt::Gpt;
 pub use gpt::{GptEntry, GptHeader, GPT_MAGIC, GPT_NAME_LEN_U16};
 
@@ -118,6 +113,9 @@
 mod multi_blocks;
 pub use multi_blocks::AsMultiBlockDevices;
 
+mod non_blocking;
+pub use non_blocking::{BlockDeviceEx, IoStatus, NonBlockingBlockIo, Transaction};
+
 /// The type of Result used in this library.
 pub type Result<T> = core::result::Result<T, StorageError>;
 
@@ -126,12 +124,14 @@
 pub enum StorageError {
     ArithmeticOverflow(safemath::Error),
     BlockDeviceNotFound,
-    BlockIoError,
+    BlockIoError(BlockIoError),
     BlockIoNotProvided,
     FailedGettingBlockDevices(Option<&'static str>),
+    IoAborted,
     InvalidInput,
     NoValidGpt,
     NotExist,
+    NotReady,
     OutOfRange,
     PartitionNotUnique,
     ScratchTooSmall,
@@ -149,42 +149,68 @@
     }
 }
 
+impl From<BlockIoError> for StorageError {
+    fn from(val: BlockIoError) -> Self {
+        Self::BlockIoError(val)
+    }
+}
+
 impl core::fmt::Display for StorageError {
     fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
-        match self {
-            StorageError::ArithmeticOverflow(e) => write!(f, "Arithmetic overflow {:?}", e),
-            StorageError::BlockDeviceNotFound => write!(f, "Block device not found"),
-            StorageError::BlockIoError => write!(f, "Block IO error"),
-            StorageError::BlockIoNotProvided => write!(f, "Block IO is not provided"),
-            StorageError::FailedGettingBlockDevices(v) => {
-                write!(f, "Failed to iterate all block devices {:?}", v)
-            }
-            StorageError::InvalidInput => write!(f, "Invalid input"),
-            StorageError::NoValidGpt => write!(f, "GPT not found"),
-            StorageError::NotExist => write!(f, "The specified partition could not be found"),
-            StorageError::OutOfRange => write!(f, "Out of range"),
-            StorageError::PartitionNotUnique => {
-                write!(f, "Partition is found on multiple block devices")
-            }
-            StorageError::ScratchTooSmall => write!(f, "Not enough scratch buffer"),
-        }
+        write!(f, "{:?}", self)
     }
 }
 
+/// `BlockInfo` contains information for a block device.
+pub struct BlockInfo {
+    /// Native block size of the block device.
+    pub block_size: u64,
+    /// Total number of blocks of the block device.
+    pub num_blocks: u64,
+    /// The alignment requirement for IO buffers. For example, many block device drivers use DMA
+    /// for data transfer, which typically requires that the buffer address for DMA be aligned to
+    /// 16/32/64 bytes etc. If the block device has no alignment requirement, it can return 1.
+    pub alignment: u64,
+}
+
+impl BlockInfo {
+    /// Computes the total size in bytes of the block device.
+    pub fn total_size(&self) -> Result<u64> {
+        Ok((SafeNum::from(self.block_size) * self.num_blocks).try_into()?)
+    }
+}
+
+/// `BlockIoError` represents the error code for returned by implementation of `BlockIo` and
+/// `NonBlockingBlockIo` interfaces.
+#[derive(Debug, Copy, Clone, PartialEq, Eq)]
+pub enum BlockIoError {
+    MediaBusy,
+    Others(Option<&'static str>),
+}
+
 /// `BlockIo` contains methods for reading/writing blocks of data to a block device with aligned
 /// input/output buffers.
 pub trait BlockIo {
+    /// Gets the `BlockInfo` for this block device
+    fn info(&mut self) -> BlockInfo;
+
     /// Returns the block size of the block device.
-    fn block_size(&mut self) -> u64;
+    fn block_size(&mut self) -> u64 {
+        self.info().block_size
+    }
 
     /// Returns the total number of blocks of the block device.
-    fn num_blocks(&mut self) -> u64;
+    fn num_blocks(&mut self) -> u64 {
+        self.info().num_blocks
+    }
 
     /// Returns the alignment requirement for buffers passed to the `write_blocks()` and
     /// `read_blocks()` methods. For example, many block device drivers use DMA for data transfer,
     /// which typically requires that the buffer address for DMA be aligned to 16/32/64 bytes etc.
     /// If the block device has no alignment requirement, it can return 1.
-    fn alignment(&mut self) -> u64;
+    fn alignment(&mut self) -> u64 {
+        self.info().alignment
+    }
 
     /// Read blocks of data from the block device
     ///
@@ -198,7 +224,11 @@
     /// # Returns
     ///
     /// Returns true if exactly out.len() number of bytes are read. Otherwise false.
-    fn read_blocks(&mut self, blk_offset: u64, out: &mut [u8]) -> bool;
+    fn read_blocks(
+        &mut self,
+        blk_offset: u64,
+        out: &mut [u8],
+    ) -> core::result::Result<(), BlockIoError>;
 
     /// Write blocks of data to the block device
     ///
@@ -212,7 +242,11 @@
     /// # Returns
     ///
     /// Returns true if exactly data.len() number of bytes are written. Otherwise false.
-    fn write_blocks(&mut self, blk_offset: u64, data: &[u8]) -> bool;
+    fn write_blocks(
+        &mut self,
+        blk_offset: u64,
+        data: &mut [u8],
+    ) -> core::result::Result<(), BlockIoError>;
 }
 
 /// `Partition` contains information about a GPT partition.
@@ -295,7 +329,7 @@
     ///
     /// * GPT headers will be cached in the scratch buffer after calling `Self::sync_gpt()` and
     ///   returning success. Subsequent call of `Self:read_gpt_partiton()`,
-    ///   `Self::write_gpt_partition()`, and `Self::write_gpt_partition_mut()`
+    ///   `Self::write_gpt_partition()`, and `Self::write_gpt_partition()`
     ///   will look up partition entries from the cached GPT header.
     ///   Thus callers should make sure to always return the same scratch buffer and avoid
     ///   modifying its content.
@@ -344,31 +378,12 @@
     ///
     /// * `data`: Data to write.
     ///
-    /// * If offset`/`data.len()` is not aligned to `Self::block_size()`
-    ///   or data.as_ptr() is not aligned to `Self::alignment()`, the API may
-    ///   reduce to a block by block read-modify-write in the worst case, which can be inefficient.
-    ///
-    /// * Returns success when exactly `data.len()` number of bytes are written.
-    fn write(&mut self, offset: u64, data: &[u8]) -> Result<()> {
-        with_partitioned_scratch(self, |io, alignment_scratch, _, _| {
-            write_bytes(io, offset, data, alignment_scratch)
-        })?
-    }
-
-    /// Write data to the device.
-    ///
-    /// # Args
-    ///
-    /// * `offset`: Offset in number of bytes.
-    ///
-    /// * `data`: Data to write.
-    ///
     /// * The API enables an optimization which temporarily changes `data` layout internally and
     ///   reduces the number of calls to `Self::write_blocks()` down to O(1) regardless of input's
     ///   alignment. This is the recommended usage.
     ///
     /// * Returns success when exactly `data.len()` number of bytes are written.
-    fn write_mut(&mut self, offset: u64, data: &mut [u8]) -> Result<()> {
+    fn write(&mut self, offset: u64, data: &mut [u8]) -> Result<()> {
         with_partitioned_scratch(self, |io, alignment_scratch, _, _| {
             write_bytes_mut(io, offset, data, alignment_scratch)
         })?
@@ -427,47 +442,15 @@
     ///
     /// Returns success when exactly `out.len()` of bytes are read successfully.
     fn read_gpt_partition(&mut self, part_name: &str, offset: u64, out: &mut [u8]) -> Result<()> {
-        with_partitioned_scratch(self, |io, alignment_scratch, gpt_buffer, _| {
-            gpt::read_gpt_partition(
-                io,
-                &mut Gpt::from_existing(gpt_buffer)?,
-                part_name,
-                offset,
-                out,
-                alignment_scratch,
-            )
-        })?
-    }
-
-    /// Write a GPT partition on a block device
-    ///
-    /// # Args
-    ///
-    /// * `part_name`: Name of the partition.
-    ///
-    /// * `offset`: Offset in number of bytes into the partition.
-    ///
-    /// * `data`: Data to write. See `data` passed to `BlockIo::write()` for details.
-    ///
-    /// # Returns
-    ///
-    /// Returns success when exactly `data.len()` of bytes are written successfully.
-    fn write_gpt_partition(&mut self, part_name: &str, offset: u64, data: &[u8]) -> Result<()> {
-        with_partitioned_scratch(self, |io, alignment_scratch, gpt_buffer, _| {
-            gpt::write_gpt_partition(
-                io,
-                &mut Gpt::from_existing(gpt_buffer)?,
-                part_name,
-                offset,
-                data,
-                alignment_scratch,
-            )
-        })?
+        let offset = with_partitioned_scratch(self, |_, _, gpt_buffer, _| {
+            check_gpt_rw_params(gpt_buffer, part_name, offset, out.len())
+        })??;
+        self.read(offset, out)
     }
 
     /// Write a GPT partition on a block device.
     /// Optimization for mutable buffers.
-    /// See `AsBlockDevice::write_mut` for details on alignment requirements
+    /// See `AsBlockDevice::write` for details on alignment requirements
     /// for optimized performance.
     ///
     /// # Args
@@ -481,22 +464,11 @@
     /// # Returns
     ///
     /// Returns success when exactly `data.len()` of bytes are written successfully.
-    fn write_gpt_partition_mut(
-        &mut self,
-        part_name: &str,
-        offset: u64,
-        data: &mut [u8],
-    ) -> Result<()> {
-        with_partitioned_scratch(self, |io, alignment_scratch, gpt_buffer, _| {
-            gpt::write_gpt_partition_mut(
-                io,
-                &mut Gpt::from_existing(gpt_buffer)?,
-                part_name,
-                offset,
-                data.into(),
-                alignment_scratch,
-            )
-        })?
+    fn write_gpt_partition(&mut self, part_name: &str, offset: u64, data: &mut [u8]) -> Result<()> {
+        let offset = with_partitioned_scratch(self, |_, _, gpt_buffer, _| {
+            check_gpt_rw_params(gpt_buffer, part_name, offset, data.len())
+        })??;
+        self.write(offset, data)
     }
 }
 
@@ -598,10 +570,7 @@
     out: &mut [u8],
 ) -> Result<()> {
     let blk_offset = check_range(blk_io, offset, out).map(u64::try_from)??;
-    if blk_io.read_blocks(blk_offset, out) {
-        return Ok(());
-    }
-    Err(StorageError::BlockIoError)
+    Ok(blk_io.read_blocks(blk_offset, out)?)
 }
 
 /// Read with block-aligned offset and aligned buffer. Size don't need to be block aligned.
@@ -767,12 +736,13 @@
     )
 }
 
-fn write_aligned_all(blk_io: &mut (impl BlockIo + ?Sized), offset: u64, data: &[u8]) -> Result<()> {
+fn write_aligned_all(
+    blk_io: &mut (impl BlockIo + ?Sized),
+    offset: u64,
+    data: &mut [u8],
+) -> Result<()> {
     let blk_offset = check_range(blk_io, offset, data)?;
-    if blk_io.write_blocks(blk_offset.try_into()?, data) {
-        return Ok(());
-    }
-    Err(StorageError::BlockIoError)
+    Ok(blk_io.write_blocks(blk_offset.try_into()?, data)?)
 }
 
 /// Write with block-aligned offset and aligned buffer. `data.len()` can be unaligned.
@@ -781,7 +751,7 @@
 fn write_aligned_offset_and_buffer(
     blk_io: &mut (impl BlockIo + ?Sized),
     offset: u64,
-    data: &[u8],
+    data: &mut [u8],
     scratch: &mut [u8],
 ) -> Result<()> {
     debug_assert!(is_aligned(offset.into(), blk_io.block_size().into())?);
@@ -790,7 +760,7 @@
     let aligned_write: usize =
         SafeNum::from(data.len()).round_down(blk_io.block_size()).try_into()?;
     if aligned_write > 0 {
-        write_aligned_all(blk_io, offset, &data[..aligned_write])?;
+        write_aligned_all(blk_io, offset, &mut data[..aligned_write])?;
     }
     let unaligned = &data[aligned_write..];
     if unaligned.len() == 0 {
@@ -805,52 +775,6 @@
     write_aligned_all(blk_io, unaligned_start, block_scratch)
 }
 
-/// Write data to the device with immutable input bytes slice. However, if `offset`/`data.len()`
-/// is not aligned to `blk_io.block_size()` or data.as_ptr() is not aligned to
-/// `blk_io.alignment()`, the API may reduce to a block by block read-modify-write in the worst
-/// case.
-fn write_bytes(
-    blk_io: &mut (impl BlockIo + ?Sized),
-    offset: u64,
-    data: &[u8],
-    scratch: &mut [u8],
-) -> Result<()> {
-    let (_, block_scratch) = split_scratch(blk_io, scratch)?;
-    let block_size = SafeNum::from(blk_io.block_size());
-    let mut data_offset = SafeNum::ZERO;
-    let mut offset = SafeNum::from(offset);
-    while usize::try_from(data_offset)? < data.len() {
-        if is_aligned(offset, block_size)?
-            && is_buffer_aligned(&data[data_offset.try_into()?..], blk_io.alignment())?
-        {
-            return write_aligned_offset_and_buffer(
-                blk_io,
-                offset.try_into()?,
-                &data[data_offset.try_into()?..],
-                block_scratch,
-            );
-        }
-
-        let block_offset = offset.round_down(block_size);
-        let copy_offset = offset - block_offset;
-        let copy_size =
-            min(data[data_offset.try_into()?..].len(), (block_size - copy_offset).try_into()?);
-        if copy_size < block_size.try_into()? {
-            // Partial block copy. Perform read-modify-write
-            read_aligned_all(blk_io, block_offset.try_into()?, block_scratch)?;
-        }
-        block_scratch[copy_offset.try_into()?..(copy_offset + copy_size).try_into()?]
-            .clone_from_slice(
-                &data[data_offset.try_into()?..(data_offset + copy_size).try_into()?],
-            );
-        write_aligned_all(blk_io, block_offset.try_into()?, block_scratch)?;
-        data_offset += copy_size;
-        offset += copy_size;
-    }
-
-    Ok(())
-}
-
 /// Swap the position of sub segment [0..pos] and [pos..]
 fn swap_slice(slice: &mut [u8], pos: usize) {
     let (left, right) = slice.split_at_mut(pos);
@@ -888,7 +812,7 @@
             write_aligned_offset_and_buffer(
                 blk_io,
                 aligned_start,
-                &data[aligned_relative_offset..],
+                &mut data[aligned_relative_offset..],
                 scratch,
             )?;
         } else {
@@ -896,8 +820,12 @@
                 (SafeNum::from(data.len()) - aligned_relative_offset).try_into()?;
             // Swap the offset-aligned part to the beginning of the buffer (assumed aligned)
             swap_slice(data, aligned_relative_offset);
-            let res =
-                write_aligned_offset_and_buffer(blk_io, aligned_start, &data[..write_len], scratch);
+            let res = write_aligned_offset_and_buffer(
+                blk_io,
+                aligned_start,
+                &mut data[..write_len],
+                scratch,
+            );
             // Swap the two parts back before checking the result.
             swap_slice(data, write_len);
             res?;
@@ -1028,10 +956,11 @@
     const READ_WRITE_BLOCKS_UPPER_BOUND: usize = 6;
 
     fn read_test_helper(case: &TestCase) {
+        let data = (0..case.storage_size).map(|v| v as u8).collect::<Vec<_>>();
         let mut blk = TestBlockDeviceBuilder::new()
             .set_alignment(case.alignment)
             .set_block_size(case.block_size)
-            .set_size(case.storage_size as usize)
+            .set_data(&data)
             .build();
         // Make an aligned buffer. A misaligned version is created by taking a sub slice that
         // starts at an unaligned offset. Because of this we need to allocate
@@ -1059,8 +988,7 @@
         let mut blk = TestBlockDeviceBuilder::new()
             .set_alignment(case.alignment)
             .set_block_size(case.block_size)
-            .set_size(case.storage_size as usize)
-            .set_data(&data[..])
+            .set_data(&data)
             .build();
         // Write a reverse version of the current data.
         let rw_offset = SafeNum::from(case.rw_offset);
@@ -1108,31 +1036,11 @@
                     read_test_helper(&TestCase::new(x0, x1, x2, x3, x4, x5));
                 }
 
-                // Input bytes slice is an immutable reference
-                #[test]
-                fn write_test() {
-                    let func = |blk: &mut TestBlockDevice, offset: u64, data: &mut [u8]| {
-                        blk.write(offset, data).unwrap();
-                    };
-                    write_test_helper(&TestCase::new($x0, $x1, $x2, $x3, $x4, $x5), func);
-                }
-
-                #[test]
-                fn write_scaled_test() {
-                    // Scaled all parameters by double and test again.
-                    let func = |blk: &mut TestBlockDevice, offset: u64, data: &mut [u8]| {
-                        blk.write(offset, data).unwrap();
-                    };
-                    let (x0, x1, x2, x3, x4, x5) =
-                        (2 * $x0, 2 * $x1, 2 * $x2, 2 * $x3, 2 * $x4, 2 * $x5);
-                    write_test_helper(&TestCase::new(x0, x1, x2, x3, x4, x5), func);
-                }
-
                 // Input bytes slice is a mutable reference
                 #[test]
                 fn write_mut_test() {
                     let func = |blk: &mut TestBlockDevice, offset: u64, data: &mut [u8]| {
-                        blk.write_mut(offset, data).unwrap();
+                        blk.write(offset, data).unwrap();
                         assert!(blk.io.num_reads <= READ_WRITE_BLOCKS_UPPER_BOUND);
                         assert!(blk.io.num_writes <= READ_WRITE_BLOCKS_UPPER_BOUND);
                     };
@@ -1145,7 +1053,7 @@
                     let (x0, x1, x2, x3, x4, x5) =
                         (2 * $x0, 2 * $x1, 2 * $x2, 2 * $x3, 2 * $x4, 2 * $x5);
                     let func = |blk: &mut TestBlockDevice, offset: u64, data: &mut [u8]| {
-                        blk.write_mut(offset, data).unwrap();
+                        blk.write(offset, data).unwrap();
                         assert!(blk.io.num_reads <= READ_WRITE_BLOCKS_UPPER_BOUND);
                         assert!(blk.io.num_writes <= READ_WRITE_BLOCKS_UPPER_BOUND);
                     };
@@ -1433,11 +1341,8 @@
             .set_max_gpt_entries(0)
             .set_size(512)
             .build();
-        assert!(blk.write_mut(512, vec![0u8; 1].as_mut_slice()).is_err());
-        assert!(blk.write_mut(0, vec![0u8; 513].as_mut_slice()).is_err());
-
-        assert!(blk.write(512, &vec![0u8; 1]).is_err());
-        assert!(blk.write(0, &vec![0u8; 513]).is_err());
+        assert!(blk.write(512, vec![0u8; 1].as_mut_slice()).is_err());
+        assert!(blk.write(0, vec![0u8; 513].as_mut_slice()).is_err());
     }
 
     #[test]
@@ -1448,8 +1353,7 @@
             .set_max_gpt_entries(0)
             .set_size(512)
             .build();
-        assert!(blk.write_mut(u64::MAX, vec![0u8; 1].as_mut_slice()).is_err());
-        assert!(blk.write(u64::MAX, &vec![0u8; 1]).is_err());
+        assert!(blk.write(u64::MAX, vec![0u8; 1].as_mut_slice()).is_err());
     }
 
     #[test]
diff --git a/gbl/libstorage/src/multi_blocks.rs b/gbl/libstorage/src/multi_blocks.rs
index 8c193a7..9525e31 100644
--- a/gbl/libstorage/src/multi_blocks.rs
+++ b/gbl/libstorage/src/multi_blocks.rs
@@ -91,20 +91,6 @@
     /// Writes a GPT partition with mutable input buffer.
     ///
     /// Returns Ok(()) if the partition is unique among all block devices and write is successful.
-    fn write_gpt_partition_mut(
-        &mut self,
-
-        part_name: &str,
-        offset: u64,
-        data: &mut [u8],
-    ) -> Result<()> {
-        self.check_part(part_name)?;
-        until_ok(self, |dev, _| dev.write_gpt_partition_mut(part_name, offset, &mut data[..]))
-    }
-
-    /// Writes a GPT partition with const input buffer.
-    ///
-    /// Returns Ok(()) if the partition is unique among all block devices and write is successful.
     fn write_gpt_partition(&mut self, part_name: &str, offset: u64, data: &mut [u8]) -> Result<()> {
         self.check_part(part_name)?;
         until_ok(self, |dev, _| dev.write_gpt_partition(part_name, offset, &mut data[..]))
@@ -291,12 +277,12 @@
         let to_write = &mut data[off.try_into().unwrap()..];
 
         let mut out = vec![0u8; to_write.len()];
-        devs.write_gpt_partition_mut(part, off, to_write).unwrap();
+        devs.write_gpt_partition(part, off, to_write).unwrap();
         devs.read_gpt_partition(part, off, &mut out[..]).unwrap();
         assert_eq!(out, to_write.to_vec());
 
         to_write.reverse();
-        devs.write_gpt_partition_mut(part, off, to_write).unwrap();
+        devs.write_gpt_partition(part, off, to_write).unwrap();
         devs.read_gpt_partition(part, off, &mut out[..]).unwrap();
         assert_eq!(out, to_write.to_vec());
     }
@@ -336,7 +322,6 @@
         ]);
         devs.sync_gpt_all(&mut |_, _, _| panic!("GPT sync failed"));
         assert!(devs.read_gpt_partition("boot_a", 0, &mut []).is_err());
-        assert!(devs.write_gpt_partition_mut("boot_a", 0, &mut []).is_err());
         assert!(devs.write_gpt_partition("boot_a", 0, &mut []).is_err());
         assert!(devs.find_partition("boot_a").is_err());
     }
diff --git a/gbl/libstorage/src/non_blocking.rs b/gbl/libstorage/src/non_blocking.rs
new file mode 100644
index 0000000..6aca840
--- /dev/null
+++ b/gbl/libstorage/src/non_blocking.rs
@@ -0,0 +1,753 @@
+// Copyright 2024, The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use crate::{
+    is_aligned, is_buffer_aligned, BlockInfo, BlockIo, BlockIoError, Result, StorageError,
+};
+use core::{marker::PhantomData, mem::swap};
+
+/// `IoStatus` represents the status of a non-blocking IO.
+#[derive(Debug, Copy, Clone, PartialEq, Eq)]
+pub enum IoStatus {
+    /// The IO request is aborted due to error or user request.
+    Aborted,
+    /// The IO request is completed.
+    Completed,
+    /// The IO request is still pending.
+    Pending,
+    /// The IO request doesn't exist.
+    NotFound,
+}
+
+/// `NonBlockingBlockIo` provides interfaces for performing non-blocking read/write.
+///
+/// # Safety
+///
+/// * Implementation must guarantee that `Self::check_status(buffer)` returns `IoStatus::Pending`
+///   if and only if it is retaining `buffer`. Once the implementation stops returning
+///   `IoStatus::Pending` for a buffer, it must not retain the buffer again until a new read/write
+///   request using the buffer is made.
+/// * The buffer pointer passed to `Self::check_status(buffer)` should only be used as a key value
+///   for looking up previously made read/write IO requests being tracked. Implementation should not
+///   attempt to derefernce it without confirming that the corresponding IO exists. If caller passes
+///   an invalid buffer pointer, implementation should be able to safely return
+///   `IoStatus::NotFound`.
+/// * If `Self::write_blocks()`/`Self::read_blocks()` returns error, the input buffer must not be
+///   retained.
+pub unsafe trait NonBlockingBlockIo {
+    /// Returns the `BlockInfo` for this block device.
+    fn info(&mut self) -> BlockInfo;
+
+    /// Perform non-blocking writes of data to the block device.
+    ///
+    /// # Args
+    ///
+    /// * `blk_offset`: Offset in number of blocks.
+    /// * `buffer`: Pointer to the data buffer.
+    ///
+    /// # Returns
+    ///
+    /// * Returns Ok(()) if the IO request is accepted.
+    /// * Returns Err(BlockIoError::MediaBusy) if the device is busy and the caller should try
+    ////  again later.
+    /// * Returns Err(BlockIoError::Others()) for other errors.
+    ///
+    /// # Safety
+    ///
+    /// * Caller must ensure that `buffer` points to a valid buffer.
+    /// * If the method returns Ok(()), caller must ensure that `buffer` remains valid and has no
+    ///   other references until `Self::check_status(buffer)` no longer returns
+    ///   `IoStatus::Pending`.
+    unsafe fn write_blocks(
+        &mut self,
+        blk_offset: u64,
+        buffer: *mut [u8],
+    ) -> core::result::Result<(), BlockIoError>;
+
+    /// Perform non-blocking read of data from the block device.
+    ///
+    /// # Args
+    ///
+    /// * `blk_offset`: Offset in number of blocks.
+    /// * `buffer`: Pointer to the output buffer.
+    ///
+    /// # Returns
+    ///
+    /// * Returns Ok(()) if the IO request is accepted.
+    /// * Returns Err(BlockIoError::MediaBusy) if the device is busy and the caller should try again
+    ///   later.
+    /// * Returns Err(BlockIoError::Others()) for other errors.
+    ///
+    /// # Safety
+    ///
+    /// * Caller must ensure that `buffer` points to a valid buffer.
+    /// * If the method returns Ok(()), caller must ensure that `buffer` remains valid and has no
+    ///   other references until `Self::check_status(buffer)` no longer returns
+    ///   `IoStatus::Pending`.
+    unsafe fn read_blocks(
+        &mut self,
+        blk_offset: u64,
+        buffer: *mut [u8],
+    ) -> core::result::Result<(), BlockIoError>;
+
+    /// Checks the status of the non-blocking read/write associated with the given buffer.
+    ///
+    /// # Args
+    ///
+    /// * `buf`: The buffer previously passed to `read_blocks()` / `write_blocks()`.
+    ///
+    /// # Returns
+    ///
+    /// * Returns `IoStatus::NotFound` if the request is not found.
+    /// * Returns `IoStatus::Pending` if the request is still pending.
+    /// * Returns `IoStatus::Completed` if the request has been completed successfully.
+    ///   Implementation can stop tracking the request and should return `IoStatus::NotFound` on
+    ///   subsequent queries until a new read/write request is made with the same buffer.
+    /// * Returns `IoStatus::Aborted` if the request is aborted, due to error or caller invoking
+    ///   `Self::abort()`. Implementation can stop tracking the request and should return
+    ///   `IoStatus::NotFound` on subsequent queries until a new read/write request is made with
+    ///   the same buffer.
+    fn check_status(&mut self, buf: *mut [u8]) -> IoStatus;
+
+    /// Aborts pending non-blocking IO requests.
+    ///
+    /// For currently pending requests, `Self::check_status(buf)` should eventually return
+    /// `IoStatus::Aborted` at some point in the future. For already completed requests,
+    /// `Self::check_status(buf)` should continue to return `IoStatus::Completed`.
+    fn abort(&mut self) -> core::result::Result<(), BlockIoError>;
+}
+
+// Implements the blocking version `BlockIo` for a `&mut dyn NonBlockingBlockIo`
+impl BlockIo for &mut dyn NonBlockingBlockIo {
+    fn info(&mut self) -> BlockInfo {
+        (*self).info()
+    }
+
+    fn read_blocks(
+        &mut self,
+        blk_offset: u64,
+        out: &mut [u8],
+    ) -> core::result::Result<(), BlockIoError> {
+        let ptr = out as *mut [u8];
+        // SAFETY:
+        // * This function blocks until the non-blocking IO is no longer pending.
+        // * Buffer by `ptr` is not used elsewhere.
+        unsafe { (*self).read_blocks(blk_offset, ptr)? };
+        loop {
+            match self.check_status(ptr) {
+                IoStatus::Pending => {}
+                IoStatus::Completed => return Ok(()),
+                IoStatus::Aborted => return Err(BlockIoError::Others(Some("Read aborted"))),
+                IoStatus::NotFound => panic!("Unexpected IoStatus::NotFound"),
+            }
+        }
+    }
+
+    fn write_blocks(
+        &mut self,
+        blk_offset: u64,
+        data: &mut [u8],
+    ) -> core::result::Result<(), BlockIoError> {
+        let ptr = data as *mut [u8];
+        // SAFETY:
+        // * This function blocks until the non-blocking IO is no longer pending.
+        // * Buffer by `ptr` is not used elsewhere.
+        unsafe { (*self).write_blocks(blk_offset, ptr)? };
+        loop {
+            match self.check_status(ptr) {
+                IoStatus::Pending => {}
+                IoStatus::Completed => return Ok(()),
+                IoStatus::Aborted => return Err(BlockIoError::Others(Some("write aborted"))),
+                IoStatus::NotFound => panic!("Unexpected IoStatus::NotFound"),
+            }
+        }
+    }
+}
+
+/// `BlockDeviceIo` represents either a `BlockIo` or `NonBlockingBlockIo`.
+pub enum BlockDeviceIo<'a> {
+    Blocking(&'a mut dyn BlockIo),
+    NonBlocking(&'a mut dyn NonBlockingBlockIo),
+}
+
+impl<'a> From<&'a mut dyn BlockIo> for BlockDeviceIo<'a> {
+    fn from(val: &'a mut dyn BlockIo) -> Self {
+        Self::Blocking(val)
+    }
+}
+
+impl<'a> From<&'a mut dyn NonBlockingBlockIo> for BlockDeviceIo<'a> {
+    fn from(val: &'a mut dyn NonBlockingBlockIo) -> Self {
+        Self::NonBlocking(val)
+    }
+}
+
+impl<'a> BlockDeviceIo<'a> {
+    /// Casts to a `BlockIo` trait object.
+    fn as_block_io(&mut self) -> &mut dyn BlockIo {
+        match self {
+            Self::Blocking(v) => *v,
+            Self::NonBlocking(v) => v,
+        }
+    }
+
+    /// Creates a sub-instance that borrows internal fields.
+    ///
+    /// This creates an instance where its lifetime parameter 'a is coerced to the life time of the
+    /// current object. This will be used for creating a local instance for blocking operation.
+    fn scoped_instance(&mut self) -> BlockDeviceIo {
+        match self {
+            Self::Blocking(v) => BlockDeviceIo::Blocking(*v),
+            Self::NonBlocking(v) => BlockDeviceIo::NonBlocking(*v),
+        }
+    }
+}
+
+/// `IoBufferState` wraps a raw buffer and keeps track of its use state in non-blocking IO.
+#[derive(Debug)]
+enum IoBufferState<'a> {
+    Ready(&'a mut [u8], IoStatus),
+    // (Original buffer &mut [u8], subslice pointer passed to non-blocking IO, phantom)
+    Pending(*mut [u8], *mut [u8], PhantomData<&'a mut [u8]>),
+}
+
+/// `IoBuffer` wraps a `IoBufferState` and implements `Drop` to check that the buffer is not
+/// pending when going out of scope.
+#[derive(Debug)]
+struct IoBuffer<'a>(IoBufferState<'a>);
+
+impl Drop for IoBuffer<'_> {
+    fn drop(&mut self) {
+        // Panics if an `IoBuffer` goes out of scope in a pending state.
+        // This is merely used for safety reasoning in `read_io_buffer()`/`write_io_buffer()` and
+        // should not be triggered if implementation logic is incorrect. Specifically, `IoBuffer`
+        // is only used internally in `BlockDeviceEx`. When `BlockDeviceEx` goes out of scope, it
+        // performs abort() and sync() to make sure no buffer is pending.
+        assert!(!self.is_pending());
+    }
+}
+
+impl<'a> IoBuffer<'a> {
+    /// Creates a new instance.
+    fn new(buffer: &'a mut [u8]) -> Self {
+        Self(IoBufferState::Ready(buffer, IoStatus::Completed))
+    }
+
+    /// Gets the cached status.
+    ///
+    /// To update the cached status, caller should call `Self::update()` first.
+    fn status(&self) -> IoStatus {
+        match self.0 {
+            IoBufferState::Ready(_, status) => status,
+            _ => IoStatus::Pending,
+        }
+    }
+
+    /// Returns whether the buffer is pending in a non-blocking IO.
+    ///
+    /// The returned value is based on the cached status. To update the cached status, caller
+    /// should call `Self::update()` first.
+    fn is_pending(&self) -> bool {
+        matches!(self.status(), IoStatus::Pending)
+    }
+
+    /// Returns whether the corresponding IO is aborted.
+    ///
+    /// The returned value is based on the cached status. To update the cached status, caller
+    /// should call `Self::update()` first.
+    fn is_aborted(&self) -> bool {
+        matches!(self.status(), IoStatus::Aborted)
+    }
+
+    /// Sets buffer to the pending state.
+    ///
+    /// Returns the pointer to the specified subslice that can be passed to
+    /// `NonBlockingBlockIo:read_blocks()` and `NonBlockingBlockIo::write_blocks()` interfaces.
+    fn set_pending(&mut self, io_offset: usize, io_size: usize) -> *mut [u8] {
+        match &mut self.0 {
+            IoBufferState::Ready(b, _) => {
+                let ptr = &mut b[io_offset..][..io_size] as *mut [u8];
+                self.0 = IoBufferState::Pending(*b as _, ptr, PhantomData);
+                ptr
+            }
+            _ => unreachable!(),
+        }
+    }
+
+    /// Gets the buffer if not pending
+    fn get(&mut self) -> &mut [u8] {
+        match &mut self.0 {
+            IoBufferState::Ready(buffer, _) => buffer,
+            _ => unreachable!(),
+        }
+    }
+
+    /// Updates the IO status
+    fn update(&mut self, io: &mut dyn NonBlockingBlockIo) {
+        match &mut self.0 {
+            IoBufferState::Ready(_, _) => {}
+            IoBufferState::Pending(buffer, ptr, _) => {
+                match io.check_status(*ptr) {
+                    IoStatus::NotFound => unreachable!(), // Logic error.
+                    IoStatus::Pending => {}
+                    v => {
+                        // SAFETY:
+                        // * `buffer` is a valid pointer as it came from
+                        //   `IoBufferState::Ready(buffer, _)`
+                        // * status is no longer pending, buffer is not retained any more.
+                        self.0 = IoBufferState::Ready(unsafe { &mut **buffer }, v);
+                    }
+                }
+            }
+        }
+    }
+
+    /// Consumes and returns the raw buffer.
+    fn take(mut self) -> &'a mut [u8] {
+        match &mut self.0 {
+            IoBufferState::Ready(buffer, _) => {
+                // IoBuffer has a drop implementation, thus we can't move buffer out directly.
+                // The solution is to swap with an empty slice, which is valid for any lifetime.
+                let mut res = &mut [][..];
+                swap(&mut res, buffer);
+                res
+            }
+            _ => unreachable!(), // Logic error.
+        }
+    }
+}
+
+/// `Transaction` tracks the non-blocking read/write IO request made by
+/// `BlockDeviceEx::read_scoped()` and `BlockDeviceEx::write_scoped()`. It automatically performs
+/// sync when going out of scope.
+pub struct Transaction<'a, 'b> {
+    dev: BlockDeviceEx<'a, 'a>,
+    _phantom: PhantomData<&'b mut [u8]>,
+}
+
+impl Transaction<'_, '_> {
+    /// Wait until the IO request is either completed/aborted and consume the transaction.
+    pub fn sync(mut self) -> Result<()> {
+        self.do_sync()
+    }
+
+    /// Helper method for performing the sync.
+    fn do_sync(&mut self) -> Result<()> {
+        self.dev.sync()?;
+        match self.dev.is_aborted() {
+            true => Err(StorageError::IoAborted),
+            _ => Ok(()),
+        }
+    }
+}
+
+impl Drop for Transaction<'_, '_> {
+    fn drop(&mut self) {
+        // We expect caller to sync() themselves if they expect errors. If not the drop will
+        // perform the sync but panics on error.
+        self.do_sync().unwrap()
+    }
+}
+
+/// `BlockDeviceEx` provides safe APIs for performing blocking/non-blocking read/write.
+///
+/// `'a`: Lifetime of the borrow to BlockIo / NonBlockingBlockIo,
+/// `'b`: Lifetime of the external user buffers that will be passed to `Self::read()` and
+///       `Self::write()`.
+pub struct BlockDeviceEx<'a, 'b> {
+    io: BlockDeviceIo<'a>,
+    current_io: Option<IoBuffer<'b>>,
+}
+
+impl<'a, 'b> BlockDeviceEx<'a, 'b> {
+    /// Creates a new instance.
+    pub fn new(io: BlockDeviceIo<'a>) -> Self {
+        Self { io, current_io: None }
+    }
+
+    /// Checks if any IO buffer is pending.
+    pub fn is_pending(&self) -> bool {
+        self.current_io.as_ref().map(|v| v.is_pending()).unwrap_or(false)
+    }
+
+    /// Updates the IO status.
+    fn update_status(&mut self) {
+        let BlockDeviceIo::NonBlocking(ref mut io) = self.io else {
+            return;
+        };
+
+        match self.current_io.as_mut() {
+            Some(buffer) => buffer.update(*io),
+            _ => {}
+        }
+    }
+
+    /// Polls and updates IO status.
+    pub fn poll(&mut self) {
+        if self.current_io.is_some() {
+            self.update_status();
+        }
+    }
+
+    /// Aborts the current IO.
+    pub fn abort(&mut self) -> Result<()> {
+        match &mut self.io {
+            BlockDeviceIo::NonBlocking(io) => Ok(io.abort()?),
+            _ => Ok(()),
+        }
+    }
+
+    /// Checks if any IO is aborted.
+    pub fn is_aborted(&self) -> bool {
+        match self.current_io.as_ref() {
+            Some(buffer) => buffer.is_aborted(),
+            _ => false,
+        }
+    }
+
+    /// Waits until the IO is completed or aborted.
+    pub fn sync(&mut self) -> Result<()> {
+        while self.is_pending() {
+            self.poll();
+        }
+        Ok(())
+    }
+
+    /// Checks whether an IO is currently in progress.
+    fn check_busy(&self) -> Result<()> {
+        // No IO implies not pending.
+        match self.current_io.is_some() {
+            true => Err(StorageError::NotReady),
+            _ => Ok(()),
+        }
+    }
+
+    /// Writes data from `buffer[offset..][..size]` to the block device at offset `dst_offset`.
+    ///
+    /// # Args
+    ///
+    /// * `dst_offset`: Destination offset to write in the block device.
+    /// * `buffer`: On input, it must be a `Some(buffer)` that contains the data to write. On
+    ///   success, the buffer will be taken and it will be set to `None`. On error, `buffer` will
+    ///   remain the same so that caller can continue to access the buffer. When the IO completes
+    ///   or aborts, caller can retrieve the buffer via `Self::take_io_buffer()`.
+    /// * `offset`: Offset of the data to write in `buffer`.
+    /// * `size`: Size of the data to write.
+    pub fn write(
+        &mut self,
+        dst_offset: u64,
+        buffer: &mut Option<&'b mut [u8]>,
+        offset: usize,
+        size: usize,
+    ) -> Result<()> {
+        self.check_busy()?;
+        let blk_size = self.io.as_block_io().block_size();
+        // TODO(b/338439051): Implement support for arbitrarily aligned buffer and read range.
+        assert_eq!(dst_offset % blk_size, 0);
+        let buffer_raw = buffer.take().ok_or(StorageError::InvalidInput)?;
+        let mut io_buffer = IoBuffer::new(buffer_raw);
+        match write_io_buffer(&mut self.io, dst_offset / blk_size, &mut io_buffer, offset, size) {
+            Err(e) => {
+                // Error. Returns the buffer to caller.
+                *buffer = Some(io_buffer.take());
+                Err(e)
+            }
+            Ok(()) => {
+                self.current_io = Some(io_buffer);
+                Ok(())
+            }
+        }
+    }
+
+    /// Reads data from the block device at offset `dst_offset` into `buffer[offset..][..size]`.
+    ///
+    /// # Args
+    ///
+    /// * `dst_offset`: Destination offset to read from the block device.
+    /// * `buffer`: On input, it must be a `Some(buffer)` that contains the output buffer. On
+    ///   success, the buffer will be taken and it will be set to `None`. On error, `buffer` will
+    ///   remain the same so that caller can continue to access the buffer. When the IO completes
+    ///   or aborts, caller can retrieve the buffer via `Self::take_io_buffer()`.
+    /// * `offset`: Offset of `buffer` to read to.
+    /// * `size`: Size of the read.
+    pub fn read(
+        &mut self,
+        dst_offset: u64,
+        buffer: &mut Option<&'b mut [u8]>,
+        offset: usize,
+        size: usize,
+    ) -> Result<()> {
+        self.check_busy()?;
+        let blk_size = self.io.as_block_io().block_size();
+        // TODO(b/338439051): Implement support for arbitrarily aligned buffer and read range.
+        assert_eq!(dst_offset % blk_size, 0);
+        let buffer_raw = buffer.take().ok_or(StorageError::InvalidInput)?;
+        let mut io_buffer = IoBuffer::new(buffer_raw);
+        match read_io_buffer(&mut self.io, dst_offset / blk_size, &mut io_buffer, offset, size) {
+            Err(e) => {
+                // Error. Returns the buffer to caller.
+                *buffer = Some(io_buffer.take());
+                Err(e)
+            }
+            Ok(()) => {
+                self.current_io = Some(io_buffer);
+                Ok(())
+            }
+        }
+    }
+
+    /// Retrieves the IO buffer if it is completed/aborted.
+    pub fn take_io_buffer(&mut self) -> Result<&'b mut [u8]> {
+        match self.current_io {
+            None => Err(StorageError::NotExist),
+            Some(_) => match !self.is_pending() {
+                true => Ok(self.current_io.take().unwrap().take()),
+                _ => Err(StorageError::NotReady),
+            },
+        }
+    }
+
+    /// Returns an instance that borrows the internal field.
+    ///
+    /// This creates an instance where its lifetime parameter 'a/'b/'c is coerced to the life time
+    /// of the current object. This will be used for creating a local instance for blocking
+    /// operation.
+    fn scoped_instance(&mut self) -> Result<BlockDeviceEx> {
+        self.check_busy()?;
+        Ok(BlockDeviceEx { io: self.io.scoped_instance(), current_io: None })
+    }
+
+    /// Performs a non-blocking write and returns a `Transanction` object which automatically
+    /// performs sync when going out of scope.
+    pub fn write_scoped<'c, 'd: 'c>(
+        &'c mut self,
+        offset: u64,
+        data: &'d mut [u8],
+    ) -> Result<Transaction<'c, 'd>> {
+        let mut dev = self.scoped_instance()?;
+        let len = data.len();
+        dev.write(offset, &mut Some(data), 0, len)?;
+        Ok(Transaction { dev, _phantom: PhantomData })
+    }
+
+    /// Performs a non-blocking read and returns a `Transanction` object which automatically
+    /// performs sync when going out of scope.
+    pub fn read_scoped<'c, 'd: 'c>(
+        &'c mut self,
+        offset: u64,
+        out: &'d mut [u8],
+    ) -> Result<Transaction<'c, 'd>> {
+        let mut dev = self.scoped_instance()?;
+        let len = out.len();
+        dev.read(offset, &mut Some(out), 0, len)?;
+        Ok(Transaction { dev, _phantom: PhantomData })
+    }
+
+    /// Performs blocking write.
+    pub fn write_blocking(&mut self, offset: u64, data: &mut [u8]) -> Result<()> {
+        self.write_scoped(offset, data)?.sync()
+    }
+
+    /// Performs blocking read.
+    pub fn read_blocking(&mut self, offset: u64, out: &mut [u8]) -> Result<()> {
+        self.read_scoped(offset, out)?.sync()
+    }
+}
+
+impl Drop for BlockDeviceEx<'_, '_> {
+    fn drop(&mut self) {
+        self.abort().unwrap();
+        self.sync().unwrap();
+    }
+}
+
+/// A helper to write an IO buffer to the block device.
+fn write_io_buffer(
+    io: &mut BlockDeviceIo,
+    blk_offset: u64,
+    buffer: &mut IoBuffer,
+    offset: usize,
+    size: usize,
+) -> Result<()> {
+    let data = &mut buffer.get()[offset..][..size];
+    assert!(is_buffer_aligned(data, io.as_block_io().alignment().into())?);
+    assert!(is_aligned(size.into(), io.as_block_io().block_size().into())?);
+    Ok(match io {
+        BlockDeviceIo::Blocking(io) => io.write_blocks(blk_offset, data),
+        BlockDeviceIo::NonBlocking(io) => {
+            let ptr = buffer.set_pending(offset, size);
+            // SAFETY:
+            // * `buffer.set_pending()` makes sure that no safe code can use the buffer until it is
+            //   set ready by `IoBuffer::update_status()` when status is no longer
+            //   `IoStatus::Pending`.
+            // * When going out of scope, `IoBuffer` checks whether the buffer is still pending and
+            //   will panic if it is. Thus the buffer will remain valid with no other references
+            //   during the non-blocking IO.
+            unsafe { (*io).write_blocks(blk_offset, ptr) }
+        }
+    }?)
+}
+
+/// A helper to read data from a block device to an IO buffer.
+fn read_io_buffer(
+    io: &mut BlockDeviceIo,
+    blk_offset: u64,
+    buffer: &mut IoBuffer,
+    offset: usize,
+    size: usize,
+) -> Result<()> {
+    let out = &mut buffer.get()[offset..][..size];
+    assert!(is_buffer_aligned(out, io.as_block_io().alignment().into())?);
+    assert!(is_aligned(size.into(), io.as_block_io().block_size().into())?);
+    Ok(match io {
+        BlockDeviceIo::Blocking(io) => io.read_blocks(blk_offset, out),
+        BlockDeviceIo::NonBlocking(io) => {
+            let ptr = buffer.set_pending(offset, size);
+            // SAFETY:
+            // * `buffer.set_pending()` makes sure that no safe code can use the buffer until it is
+            //   set ready by `IoBuffer::update_status()` when status is no longer
+            //   `IoStatus::Pending`.
+            // * When going out of scope, `IoBuffer` checks whether the buffer is still pending and
+            //   will panic if it is. Thus the buffer will remain valid with no other references
+            //   during the non-blocking IO.
+            unsafe { (*io).read_blocks(blk_offset, ptr) }
+        }
+    }?)
+}
+
+#[cfg(test)]
+mod test {
+    use gbl_storage_testlib::{TestBlockDeviceBuilder, TimestampPauser};
+
+    #[test]
+    fn test_read() {
+        let mut blk = TestBlockDeviceBuilder::new()
+            .set_alignment(1)
+            .set_block_size(1)
+            .set_data(&[1, 2, 3, 4])
+            .build();
+        let mut io_buffer = [1, 0, 0, 1];
+        let mut to_write = Some(&mut io_buffer[..]);
+        {
+            let timestamp_pauser = TimestampPauser::new();
+            let mut blk_ex = blk.as_block_device_ex();
+            blk_ex.write(1, &mut to_write, 1, 2).unwrap();
+            assert!(to_write.is_none());
+            // Timestamp paused. IO not being processed. poll() should return false.
+            blk_ex.poll();
+            assert!(blk_ex.is_pending());
+
+            timestamp_pauser.resume();
+            blk_ex.sync().unwrap();
+            blk_ex.poll();
+        }
+        assert_eq!(blk.io.storage, [1, 0, 0, 4]);
+    }
+
+    #[test]
+    fn test_write() {
+        let mut blk = TestBlockDeviceBuilder::new()
+            .set_alignment(1)
+            .set_block_size(1)
+            .set_data(&[1, 2, 3, 4])
+            .build();
+        let mut io_buffer = [1, 0, 0, 1];
+        let mut to_read = Some(&mut io_buffer[..]);
+        {
+            let timestamp_pauser = TimestampPauser::new();
+            let mut blk_ex = blk.as_block_device_ex();
+            blk_ex.read(1, &mut to_read, 1, 2).unwrap();
+            assert!(to_read.is_none());
+            // Timestamp paused. IO not being processed.
+            blk_ex.poll();
+            assert!(blk_ex.is_pending());
+
+            timestamp_pauser.resume();
+            blk_ex.sync().unwrap();
+            blk_ex.poll();
+        }
+        assert_eq!(io_buffer, [1, 2, 3, 1]);
+    }
+
+    #[test]
+    fn test_read_write_blocking() {
+        let mut blk = TestBlockDeviceBuilder::new()
+            .set_alignment(1)
+            .set_block_size(1)
+            .set_data(&[1, 2, 3, 4])
+            .build();
+
+        let mut io_buffer = [0u8; 2];
+        blk.as_block_device_ex().read_blocking(1, &mut io_buffer[..]).unwrap();
+        assert_eq!(io_buffer, [2, 3]);
+
+        let mut io_buffer = [0u8; 2];
+        blk.as_block_device_ex().write_blocking(1, &mut io_buffer[..]).unwrap();
+        assert_eq!(blk.io.storage, [1, 0, 0, 4]);
+    }
+
+    #[test]
+    fn test_abort() {
+        let mut blk = TestBlockDeviceBuilder::new()
+            .set_alignment(1)
+            .set_block_size(1)
+            .set_data(&[1, 2, 3, 4])
+            .build();
+        let mut io_buffer = [1, 0, 0, 1];
+        let mut to_write = Some(&mut io_buffer[..]);
+        {
+            let _ = TimestampPauser::new();
+            let mut blk_ex = blk.as_block_device_ex();
+            blk_ex.write(1, &mut to_write, 1, 2).unwrap();
+            blk_ex.abort().unwrap();
+            blk_ex.sync().unwrap();
+            assert!(blk_ex.is_aborted())
+        }
+        assert_eq!(blk.io.storage, [1, 2, 3, 4]);
+
+        let mut to_read = Some(&mut io_buffer[..]);
+        {
+            let _ = TimestampPauser::new();
+            let mut blk_ex = blk.as_block_device_ex();
+            blk_ex.read(1, &mut to_read, 1, 2).unwrap();
+            blk_ex.abort().unwrap();
+            blk_ex.sync().unwrap();
+            assert!(blk_ex.is_aborted())
+        }
+        assert_eq!(io_buffer, [1, 0, 0, 1]);
+    }
+
+    #[test]
+    fn read_write_error_on_busy() {
+        let mut blk = TestBlockDeviceBuilder::new()
+            .set_alignment(1)
+            .set_block_size(1)
+            .set_data(&[1, 2, 3, 4])
+            .build();
+        let mut io_buffer = [1, 0, 0, 1];
+        let mut to_write = Some(&mut io_buffer[..]);
+
+        let mut io_buffer_other = [0u8; 4];
+        let mut io_other_ref = Some(&mut io_buffer_other[..]);
+        {
+            let _ = TimestampPauser::new();
+            let mut blk_ex = blk.as_block_device_ex();
+            blk_ex.write(1, &mut to_write, 1, 2).unwrap();
+            assert!(blk_ex.write(1, &mut io_other_ref, 1, 2).is_err());
+            assert!(io_other_ref.is_some());
+            assert!(blk_ex.read(1, &mut io_other_ref, 1, 2).is_err());
+            assert!(io_other_ref.is_some());
+        }
+    }
+}
diff --git a/gbl/libstorage/src/testlib.rs b/gbl/libstorage/src/testlib.rs
index 21c776f..25a6219 100644
--- a/gbl/libstorage/src/testlib.rs
+++ b/gbl/libstorage/src/testlib.rs
@@ -11,16 +11,64 @@
 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 // See the License for the specific language governing permissions and
 // limitations under the License.
+use core::cell::RefCell;
+use crc32fast::Hasher;
 pub use gbl_storage::{
     alignment_scratch_size, is_aligned, is_buffer_aligned, required_scratch_size, AsBlockDevice,
-    AsMultiBlockDevices, BlockIo, GptEntry, GptHeader, GPT_MAGIC, GPT_NAME_LEN_U16,
+    AsMultiBlockDevices, BlockDeviceEx, BlockInfo, BlockIo, BlockIoError, GptEntry, GptHeader,
+    IoStatus, NonBlockingBlockIo, GPT_MAGIC, GPT_NAME_LEN_U16,
 };
-
-use crc32fast::Hasher;
 use safemath::SafeNum;
 use std::collections::BTreeMap;
 use zerocopy::AsBytes;
 
+// Declares a per-thread global instance of timestamp. The timestamp is used to control the
+// execution of non-blocking IO.
+thread_local! {
+    static TIMESTAMP: RefCell<u64> = RefCell::new(Default::default());
+    /// Number of `TimestampPauser` in effect.
+    static NUM_TIMESTAMP_PAUSER: RefCell<u64> = RefCell::new(Default::default());
+}
+
+/// Increases the value of timestamp.
+pub fn advance_timestamp() {
+    TIMESTAMP.with(|ts| (*ts.borrow_mut()) += 1);
+}
+
+/// Queries the current value of timestamp.
+pub fn query_timestamp() -> u64 {
+    NUM_TIMESTAMP_PAUSER.with(|v| (*v.borrow() == 0).then(|| advance_timestamp()));
+    TIMESTAMP.with(|ts| *ts.borrow())
+}
+
+/// When a `TimestampPauser` is in scope, timestamp will not be increased by query.
+pub struct TimestampPauser {}
+
+impl Drop for TimestampPauser {
+    fn drop(&mut self) {
+        NUM_TIMESTAMP_PAUSER.with(|v| *v.borrow_mut() -= 1);
+    }
+}
+
+impl TimestampPauser {
+    /// Creates a new instance to pause the timestamp.
+    pub fn new() -> Self {
+        NUM_TIMESTAMP_PAUSER.with(|v| *v.borrow_mut() += 1);
+        Self {}
+    }
+
+    /// Consumes the pauser, causing it to go out of scope. When all pausers go out of scope,
+    /// timestamp resumes.
+    pub fn resume(self) {}
+}
+
+/// `NonBlockingIoState` tracks the non-blocking IO state.
+enum NonBlockingIoState {
+    // (timestamp when initiated, blk offset, buffer, is read)
+    Pending(u64, u64, &'static mut [u8], bool),
+    Ready(IoStatus),
+}
+
 /// Helper `gbl_storage::BlockIo` struct for TestBlockDevice.
 pub struct TestBlockIo {
     /// The storage block size in bytes.
@@ -33,11 +81,13 @@
     pub num_writes: usize,
     /// The number of successful read calls.
     pub num_reads: usize,
+    /// Pending non-blocking IO
+    io: Option<NonBlockingIoState>,
 }
 
 impl TestBlockIo {
     pub fn new(block_size: u64, alignment: u64, data: Vec<u8>) -> Self {
-        Self { block_size, alignment, storage: data, num_writes: 0, num_reads: 0 }
+        Self { block_size, alignment, storage: data, num_writes: 0, num_reads: 0, io: None }
     }
 
     fn check_alignment(&mut self, buffer: &[u8]) -> bool {
@@ -47,40 +97,111 @@
 }
 
 impl BlockIo for TestBlockIo {
-    fn block_size(&mut self) -> u64 {
-        self.block_size
+    fn info(&mut self) -> BlockInfo {
+        NonBlockingBlockIo::info(self)
     }
 
-    fn num_blocks(&mut self) -> u64 {
-        self.storage.len() as u64 / self.block_size()
+    fn read_blocks(&mut self, blk_offset: u64, out: &mut [u8]) -> Result<(), BlockIoError> {
+        // `BlockIo` is implemented for `&mut dyn NonBlockingBlockIo`
+        BlockIo::read_blocks(&mut (self as &mut dyn NonBlockingBlockIo), blk_offset, out)
     }
 
-    fn alignment(&mut self) -> u64 {
-        self.alignment
+    fn write_blocks(&mut self, blk_offset: u64, data: &mut [u8]) -> Result<(), BlockIoError> {
+        BlockIo::write_blocks(&mut (self as &mut dyn NonBlockingBlockIo), blk_offset, data)
     }
+}
 
-    fn read_blocks(&mut self, blk_offset: u64, out: &mut [u8]) -> bool {
-        if !self.check_alignment(out) {
-            return false;
+// SAFETY:
+// * When `TestBlockIo::io` is `Some(NonBlockingIoState(Pending(_, _, buffer, _)))`,
+//   `check_status()` always returns `IoStatus::Pending`. `check_status()` returns other `IoStatus`
+//   values if and only if `TestBlockIo::io` is not `Some(NonBlockingIoState(Pending())`, in which
+//   case the buffer is not tracked anymore and thus will not be retained again.
+// * `Self::check_status()` does not dereference the input pointer.
+// * `TestBlockIo::io` is set to `Some(NonBlockingIoState(Pending(_, _, buffer, _)))` and retains
+//   the buffer only on success (returning Ok(())).
+unsafe impl NonBlockingBlockIo for TestBlockIo {
+    /// Returns a `BlockInfo` for the block device.
+    fn info(&mut self) -> BlockInfo {
+        BlockInfo {
+            block_size: self.block_size,
+            num_blocks: u64::try_from(self.storage.len()).unwrap() / self.block_size,
+            alignment: self.alignment,
         }
-
-        let start = SafeNum::from(blk_offset) * self.block_size();
-        let end = start + out.len();
-        out.clone_from_slice(&self.storage[start.try_into().unwrap()..end.try_into().unwrap()]);
-        self.num_reads += 1;
-        true
     }
 
-    fn write_blocks(&mut self, blk_offset: u64, data: &[u8]) -> bool {
-        if !self.check_alignment(data) {
-            return false;
+    unsafe fn write_blocks(
+        &mut self,
+        blk_offset: u64,
+        buffer: *mut [u8],
+    ) -> core::result::Result<(), BlockIoError> {
+        match self.io {
+            Some(_) => Err(BlockIoError::MediaBusy),
+            _ => {
+                self.num_writes += 1;
+                // SAFETY: By safety requirement, trait implementation can retain the buffer until
+                // it no longer returns `IoStatus::Pending` in `Self::check_status()`.
+                let buffer = unsafe { &mut *buffer };
+                assert!(self.check_alignment(buffer));
+                self.io =
+                    Some(NonBlockingIoState::Pending(query_timestamp(), blk_offset, buffer, false));
+                Ok(())
+            }
         }
+    }
 
-        let start = SafeNum::from(blk_offset) * self.block_size();
-        let end = start + data.len();
-        self.storage[start.try_into().unwrap()..end.try_into().unwrap()].clone_from_slice(&data);
-        self.num_writes += 1;
-        true
+    unsafe fn read_blocks(
+        &mut self,
+        blk_offset: u64,
+        buffer: *mut [u8],
+    ) -> core::result::Result<(), BlockIoError> {
+        match self.io {
+            Some(_) => Err(BlockIoError::MediaBusy),
+            _ => {
+                self.num_reads += 1;
+                // SAFETY: By safety requirement, trait implementation can retain the buffer until
+                // it no longer returns `IoStatus::Pending` in `Self::check_status()`.
+                let buffer = unsafe { &mut *buffer };
+                assert!(self.check_alignment(buffer));
+                self.io =
+                    Some(NonBlockingIoState::Pending(query_timestamp(), blk_offset, buffer, true));
+                Ok(())
+            }
+        }
+    }
+
+    fn check_status(&mut self, buf: *mut [u8]) -> IoStatus {
+        match self.io.as_mut() {
+            Some(NonBlockingIoState::Pending(ts, blk_offset, ref mut buffer, is_read))
+                if std::ptr::eq(*buffer as *const [u8], buf as _) =>
+            {
+                // Executes the IO if current timestamp is newer.
+                if query_timestamp() > *ts {
+                    let offset = (SafeNum::from(*blk_offset) * self.block_size).try_into().unwrap();
+                    match is_read {
+                        true => buffer.clone_from_slice(&self.storage[offset..][..buffer.len()]),
+                        _ => self.storage[offset..][..buffer.len()].clone_from_slice(buffer),
+                    }
+                    self.io = Some(NonBlockingIoState::Ready(IoStatus::Completed));
+                }
+                IoStatus::Pending
+            }
+            Some(NonBlockingIoState::Ready(v)) => {
+                let res = *v;
+                self.io.take();
+                res
+            }
+            _ => IoStatus::NotFound,
+        }
+    }
+
+    fn abort(&mut self) -> core::result::Result<(), BlockIoError> {
+        match self.io {
+            Some(NonBlockingIoState::Pending(_, _, _, _)) => {
+                self.io = Some(NonBlockingIoState::Ready(IoStatus::Aborted));
+            }
+            _ => {}
+        }
+        Ok(())
     }
 }
 
@@ -93,6 +214,12 @@
     max_gpt_entries: u64,
 }
 
+impl TestBlockDevice {
+    pub fn as_block_device_ex(&mut self) -> BlockDeviceEx {
+        BlockDeviceEx::new((&mut self.io as &mut dyn NonBlockingBlockIo).into())
+    }
+}
+
 impl From<&[u8]> for TestBlockDevice {
     fn from(data: &[u8]) -> Self {
         TestBlockDeviceBuilder::new().set_data(data).build()