From d287dae6d45d93342b821c9a851ed6b17a32f108 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Wed, 18 Sep 2024 10:54:48 -0400 Subject: [PATCH] Simplify future-related code and enable concurrency --- cpp/src/io/parquet/reader_impl.hpp | 2 +- cpp/src/io/parquet/reader_impl_preprocess.cu | 36 +++++++++++--------- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp index 2d46da14bec..1d8d05d0766 100644 --- a/cpp/src/io/parquet/reader_impl.hpp +++ b/cpp/src/io/parquet/reader_impl.hpp @@ -191,7 +191,7 @@ class reader::impl { * @return pair of boolean indicating if compressed chunks were found and a vector of futures for * read completion */ - std::pair>> read_column_chunks(); + std::pair> read_column_chunks(); /** * @brief Read compressed data and page information for the current pass. diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 8e67f233213..df7b5025200 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -276,8 +276,14 @@ void generate_depth_remappings( } } auto sync_fn = [](decltype(read_tasks) read_tasks) { - for (auto& task : read_tasks) { - task.wait(); + std::vector threads; + for (std::size_t task_idx = 0; task_idx < read_tasks.size(); ++task_idx) { + threads.emplace_back([](std::future& task) { task.wait(); }, + std::ref(read_tasks[task_idx])); + } + + for (auto&& thread : threads) { + thread.join(); } }; return std::async(std::launch::deferred, sync_fn, std::move(read_tasks)); @@ -964,7 +970,7 @@ void reader::impl::allocate_level_decode_space() } } -std::pair>> reader::impl::read_column_chunks() +std::pair> reader::impl::read_column_chunks() { auto const& row_groups_info = _pass_itm_data->row_groups; @@ -989,7 +995,6 @@ std::pair>> reader::impl::read_column_chunks // TODO: make this respect the pass-wide skip_rows/num_rows instead of the file-wide // skip_rows/num_rows // auto remaining_rows = num_rows; - std::vector> read_chunk_tasks; size_type chunk_count = 0; for (auto const& rg : row_groups_info) { auto const& row_group = _metadata->get_row_group(rg.index, rg.source_index); @@ -1018,16 +1023,15 @@ std::pair>> reader::impl::read_column_chunks } // Read compressed chunk data to device memory - read_chunk_tasks.push_back(read_column_chunks_async(_sources, - raw_page_data, - chunks, - 0, - chunks.size(), - column_chunk_offsets, - chunk_source_map, - _stream)); - - return {total_decompressed_size > 0, std::move(read_chunk_tasks)}; + return {total_decompressed_size > 0, + read_column_chunks_async(_sources, + raw_page_data, + chunks, + 0, + chunks.size(), + column_chunk_offsets, + chunk_source_map, + _stream)}; } void reader::impl::read_compressed_data() @@ -1042,9 +1046,7 @@ void reader::impl::read_compressed_data() auto const [has_compressed_data, read_chunks_tasks] = read_column_chunks(); pass.has_compressed_data = has_compressed_data; - for (auto& task : read_chunks_tasks) { - task.wait(); - } + read_chunks_tasks.wait(); // Process dataset chunk pages into output columns auto const total_pages = _has_page_index ? count_page_headers_with_pgidx(chunks, _stream)