Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify the use of std::future #16829

Draft
wants to merge 1 commit into
base: branch-24.10
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/io/parquet/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ class reader::impl {
* @return pair of boolean indicating if compressed chunks were found and a vector of futures for
* read completion
*/
std::pair<bool, std::vector<std::future<void>>> read_column_chunks();
std::pair<bool, std::future<void>> read_column_chunks();

/**
* @brief Read compressed data and page information for the current pass.
Expand Down
36 changes: 19 additions & 17 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,14 @@ void generate_depth_remappings(
}
}
auto sync_fn = [](decltype(read_tasks) read_tasks) {
for (auto& task : read_tasks) {
task.wait();
std::vector<std::thread> threads;
for (std::size_t task_idx = 0; task_idx < read_tasks.size(); ++task_idx) {
threads.emplace_back([](std::future<size_t>& task) { task.wait(); },
std::ref(read_tasks[task_idx]));
}

for (auto&& thread : threads) {
thread.join();
kingcrimsontianyu marked this conversation as resolved.
Show resolved Hide resolved
}
};
return std::async(std::launch::deferred, sync_fn, std::move(read_tasks));
Expand Down Expand Up @@ -964,7 +970,7 @@ void reader::impl::allocate_level_decode_space()
}
}

std::pair<bool, std::vector<std::future<void>>> reader::impl::read_column_chunks()
std::pair<bool, std::future<void>> reader::impl::read_column_chunks()
{
auto const& row_groups_info = _pass_itm_data->row_groups;

Expand All @@ -989,7 +995,6 @@ std::pair<bool, std::vector<std::future<void>>> reader::impl::read_column_chunks
// TODO: make this respect the pass-wide skip_rows/num_rows instead of the file-wide
// skip_rows/num_rows
// auto remaining_rows = num_rows;
std::vector<std::future<void>> read_chunk_tasks;
size_type chunk_count = 0;
for (auto const& rg : row_groups_info) {
auto const& row_group = _metadata->get_row_group(rg.index, rg.source_index);
Expand Down Expand Up @@ -1018,16 +1023,15 @@ std::pair<bool, std::vector<std::future<void>>> reader::impl::read_column_chunks
}

// Read compressed chunk data to device memory
read_chunk_tasks.push_back(read_column_chunks_async(_sources,
raw_page_data,
chunks,
0,
chunks.size(),
column_chunk_offsets,
chunk_source_map,
_stream));

return {total_decompressed_size > 0, std::move(read_chunk_tasks)};
return {total_decompressed_size > 0,
read_column_chunks_async(_sources,
raw_page_data,
chunks,
0,
chunks.size(),
column_chunk_offsets,
chunk_source_map,
_stream)};
}

void reader::impl::read_compressed_data()
Expand All @@ -1042,9 +1046,7 @@ void reader::impl::read_compressed_data()
auto const [has_compressed_data, read_chunks_tasks] = read_column_chunks();
pass.has_compressed_data = has_compressed_data;

for (auto& task : read_chunks_tasks) {
task.wait();
}
read_chunks_tasks.wait();

// Process dataset chunk pages into output columns
auto const total_pages = _has_page_index ? count_page_headers_with_pgidx(chunks, _stream)
Expand Down
Loading