Skip to content

Commit

Permalink
Replace PageAlignedBuffer with aligned BytesMut
Browse files Browse the repository at this point in the history
  • Loading branch information
sk1p committed Aug 29, 2024
1 parent 1bfd1e6 commit cc5c131
Showing 1 changed file with 15 additions and 73 deletions.
88 changes: 15 additions & 73 deletions src/storage/store/store_sync/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,15 @@ use crate::{
},
};

use bytes::BytesMut;
use parking_lot::RwLock;
use thiserror::Error;
use walkdir::WalkDir;

use std::{
alloc::{alloc_zeroed, dealloc, handle_alloc_error, Layout},
collections::HashMap,
fs::{File, OpenOptions},
io::{Read, Seek, SeekFrom, Write},
ops::{Deref, DerefMut},
path::{Path, PathBuf},
sync::{Arc, Mutex},
};
Expand Down Expand Up @@ -53,75 +52,11 @@ use std::os::unix::fs::OpenOptionsExt;

/// For `O_DIRECT`, we need a buffer that is aligned to the page size and is a
/// multiple of the page size.
struct PageAlignedBuffer {
buf: *mut u8,
layout: Layout,
}

impl PageAlignedBuffer {
/// Allocate a new page-size aligned buffer of `size` bytes. The actual size
/// will be rounded up to the next largest multiple of the page size.
pub fn new(size: usize) -> Self {
let align = page_size::get();
let layout = Layout::from_size_align(size, align)
.expect("size and align are reasonable")
.pad_to_align();

assert!(layout.size() > 0);
// SAFETY: `layout` is non-zero, as asserted above.
let buf = unsafe { alloc_zeroed(layout) };

// buf can be zero when out of memory, or if the allocator doesn't like
// our `Layout`
if buf.is_null() {
handle_alloc_error(layout);
}

Self { buf, layout }
}
}

impl Deref for PageAlignedBuffer {
type Target = [u8];

fn deref(&self) -> &Self::Target {
// SAFETY:
// * "data must be valid for reads for len * mem::size_of::<T>() many bytes, and it must be properly aligned"
// => T is u8 => alignment is trivial
// => `self.buf` is non-null, as per `buf.is_null` check above
// => `self.buf` is a single allocation
// * "`data` must point to len consecutive properly initialized values of type T."
// => `self.buf` is zero-initialized
// * "The memory referenced by the returned slice must not be mutated for the duration of lifetime 'a, except inside an UnsafeCell"
// => guaranteed by the borrow checker for us
// * "The total size len * mem::size_of::<T>() of the slice must be no
// larger than isize::MAX, and adding that size to data must not “wrap
// around” the address space."
// => given from the invariants of `Layout`
unsafe { std::slice::from_raw_parts(self.buf, self.layout.size()) }
}
}

impl DerefMut for PageAlignedBuffer {
fn deref_mut(&mut self) -> &mut Self::Target {
// SAFETY: see `deref` with the following modification:
// "The memory referenced by the returned slice must not be accessed
// through any other pointer (not derived from the return value) for the
// duration of lifetime 'a. Both read and write accesses are forbidden."
// => guaranteed by the mutable borrow
unsafe { std::slice::from_raw_parts_mut(self.buf, self.layout.size()) }
}
}

impl Drop for PageAlignedBuffer {
fn drop(&mut self) {
// SAFETY:
// * "ptr must denote a block of memory currently allocated via this allocator,"
// => we get the pointer from `System.alloc_zeroed`, and it is only free'd here in `drop`
// * "layout must be the same layout that was used to allocate that block of memory."
// => we use the `Layout` value previously used for allocation
unsafe { dealloc(self.buf, self.layout) }
}
fn bytes_aligned(size: usize) -> BytesMut {
let align = page_size::get();
let mut bytes = BytesMut::with_capacity(size + 2 * align);
let offset = bytes.as_ptr().align_offset(align);
bytes.split_off(offset)
}

/// Options for use with [`FilesystemStore`]
Expand Down Expand Up @@ -320,9 +255,16 @@ impl FilesystemStore {

// Write
if enable_direct {
let mut buf = PageAlignedBuffer::new(value.len());
buf[0..value.len()].copy_from_slice(value);
let mut buf = bytes_aligned(value.len());
buf.extend_from_slice(value);

// Pad to page size
let pad_size = buf.len().next_multiple_of(page_size::get()) - buf.len();
buf.extend(std::iter::repeat(0).take(pad_size));

file.write_all(&buf)?;

// Truncate again to requested size
file.set_len(value.len() as u64)?;
} else {
if let Some(offset) = offset {
Expand Down

0 comments on commit cc5c131

Please sign in to comment.