-
Notifications
You must be signed in to change notification settings - Fork 1.1k
[Parquet] Support skipping pages with mask based evaluation #9118
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @sdf-jkl -- this actually makes a lot of sense to me 👏
I have a few concerns:
- I am worried about the performance overhead of this approach (copying the page index and the loop for each batch) -- I will run some benchmarks to asses this
- I do wonder if we have test coverage for this entire situation -- in particular, do we have tests that repeatedly call
next_mask_chunkafter the first page and make sure we get the right rows?
If the performance looks good, I think we should add some more tests -- maybe @hhhizzz has some ideas on how to do this (or I think I can try and find some time to help out / work with codex to do so)
| /// Using the row selection to skip(4), page2 won't be read at all, so in this | ||
| /// case we can't decode all the rows and apply a mask. To correctly apply the | ||
| /// bit mask, we need all 6 values be read, but page2 is not in memory. | ||
| fn override_selector_strategy_if_needed( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice -- the idea is to avoid this function 👍
| array_reader, | ||
| schema: Arc::new(schema), | ||
| read_plan, | ||
| page_offsets: page_offsets.map(|slice| Arc::new(slice.to_vec())), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I think this will effectively will copy the entire OffsetIndexMetadataData structure (which I worry could be quite large)
I wonder if we need to find a way to avoid this (e.g. making the entire thing Arc'd in https://github.com/apache/arrow-rs/blob/67e04e758f1e62ec3d78d2f678daf433a4c54e30/parquet/src/file/metadata/mod.rs#L197-L196 somehow 🤔 )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could store only the &Vec<PageLocation> instead of the entire OffsetIndexMetadataData df9a493
| while cursor < mask.len() && selected_rows < batch_size { | ||
| let mut page_end = mask.len(); | ||
| if let Some(pages) = page_locations { | ||
| for loc in pages { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am also a little worried that this loop will take too long (it is O(N^2) in the number of pages as each time it looks through all pages
Maybe we could potentially add a PageLocationIterator to the cursor itself (so we know where to pick up)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a binary search through a vec of page offsets? Would have to construct the vec once beforehand to keep us from rebuilding it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in df9a493
|
run benchmark arrow_reader_clickbench arrow_reader_row_filter |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
|
run benchmark arrow_reader_clickbench arrow_reader_row_filter |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
|
@alamb @Dandandan clickbench q12, 24, 30 show some degradation, but everything else looks like an overall improvement. |
|
|
||
| let reader = ParquetRecordBatchReader::new(array_reader, plan); | ||
| let reader = | ||
| ParquetRecordBatchReader::new(array_reader, plan, page_offsets.cloned()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cloned may cause extra expense here, can we use Arc<[PageLocation]> to avoid that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a big api change to make PageLocation or OffsetIndexMetadataData an Arc inside ParquetMetaData.
If we'd want to make that change, I can open an issue and work up a PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @hhhizzz that copying the offsets here is not good
I thought about it some more, and I think the reason the copy is currently needed is that the decision of should the page be skipped is postponed until the next MaskChunk is needed
One potential idea I had to avoid this, is to use the page index in the ReadPlanBuilder when building, rather than pass in the page index to every call for next_batch.
So maybe that would look something like extending MaskCursor from
/// Cursor for iterating a mask-backed [`RowSelection`]
///
/// This is best for dense selections where there are many small skips
/// or selections. For example, selecting every other row.
#[derive(Debug)]
pub struct MaskCursor {
mask: BooleanBuffer,
/// Current absolute offset into the selection
position: usize,
}To also track what ranges should be skipped entirely. Maybe something like
#[derive(Debug)]
pub struct MaskCursor {
mask: BooleanBuffer,
/// Current absolute offset into the selection
position: usize,
/// Which row ranges should be skipped entirely?
skip_ranges: Vec<Range<usize>>,
}That I think would simplify the logic for next_mask_chunk significantly and it would avoid the need to copy the entire page inde
|
Thank you! @sdf-jkl , the code look great, just wondering if we could add more Unit tests. |
|
Here's the exists test: arrow-rs/parquet/src/arrow/async_reader/mod.rs Line 1218 in 13d497a
I think we can just add one more UT to test the skipping page with RowSelectionPolicy set to Mask instead of Auto
|
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @sdf-jkl and @hhhizzz -- I took a look at this PR and it is looking like it is heading in the right direction
I had some structural suggestions and I also have an idea for some additional coverage (related to predicates).
Please let me know if you are willing to work on this, otherwise I am happy to take over this PR as well (given we are hitting the problem at work, and it is blocking our upgrade)
| ) | ||
| }) { | ||
| self.row_group_offset_index(row_group_idx) | ||
| .and_then(|columns| columns.first()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a bug -- it reads the page offsets from the first column rater than the column being read
Maybe something like
self.row_group_offset_index(row_group_idx).and_then(|columns| {
columns
.iter()
.enumerate()
.find(|(leaf_idx, _)| self.projection.leaf_included(*leaf_idx))
.map(|(_, column)| column.page_locations())There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't the page offsets be same for every column? It is, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think even this should not work, because we actually need to keep page offsets for all projected columns and use it in ReadPlanBuilder(once we move it from ParquetRecordBatchReader)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I guess we go back to using the whole &[OffsetIndexMetaData]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I plan to find some time this afternoon to work on this PR -- maybe I will come up with something
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another issue with the current implementation is that ParquetRecordBatchReader is working page aware using pages offsets from a single column.
However, read happens for all columns at once, using the same boolean mask(which is col chunk specific).
https://github.com/apache/arrow-rs/pull/9118/changes#diff-850b3a44587149637b8545f66603a2b1252959fd36f7ddc55f37d6b5357816c6L1403
It seems that supporting different page offsets for each col would require us to push page awareness further down into the arrow readers.
|
|
||
| while !mask_cursor.is_empty() { | ||
| let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size) else { | ||
| let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size, page_locations) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I expect that this API needs to be extended -- it needs to be able to represent "skip the next N rows without trying to decode them"
As written here I think the first page that doesn't have any rows selected will return None (which will trigger the reader to think it is at the end of the file, even if there is data left)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reader only thinks it's the end of the file when no further rows remain in mask_cursor. Empty page is handled by initial skip in next_mask_chunk.
|
|
||
| let reader = ParquetRecordBatchReader::new(array_reader, plan); | ||
| let reader = | ||
| ParquetRecordBatchReader::new(array_reader, plan, page_offsets.cloned()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @hhhizzz that copying the offsets here is not good
I thought about it some more, and I think the reason the copy is currently needed is that the decision of should the page be skipped is postponed until the next MaskChunk is needed
One potential idea I had to avoid this, is to use the page index in the ReadPlanBuilder when building, rather than pass in the page index to every call for next_batch.
So maybe that would look something like extending MaskCursor from
/// Cursor for iterating a mask-backed [`RowSelection`]
///
/// This is best for dense selections where there are many small skips
/// or selections. For example, selecting every other row.
#[derive(Debug)]
pub struct MaskCursor {
mask: BooleanBuffer,
/// Current absolute offset into the selection
position: usize,
}To also track what ranges should be skipped entirely. Maybe something like
#[derive(Debug)]
pub struct MaskCursor {
mask: BooleanBuffer,
/// Current absolute offset into the selection
position: usize,
/// Which row ranges should be skipped entirely?
skip_ranges: Vec<Range<usize>>,
}That I think would simplify the logic for next_mask_chunk significantly and it would avoid the need to copy the entire page inde
|
Definitely willing to work on this, thanks for the review and your input! |
|
Awesome -- thanks @sdf-jkl -- I will switch focus for the rest of today and check back in tomorrow. |
| let props = WriterProperties::builder() | ||
| .set_write_batch_size(2) | ||
| .set_data_page_row_count_limit(2) | ||
| .build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the reason why tests pass is because page offsets are the same for any column.
We limit pages by row_count, not by size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's actually the same in the new test too...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in order to get different page offsets we will need to use a data page byte limit and then different page sizes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder how dict/rle encodings can affect this. Would having the first page dict or being able to compress multiple pages into one with rle make a difference in offsets?
|
@alamb It seems like I'm on to something with codex. The test passes, but I want to give it a read and a little polish first before sending it your way. |
|
It seems like the issue was caused by different sized pages after all. Bigger types would have more smaller "finer" pages and smaller types would have less bigger "coarser" pages. If the column with coarse pages was used to enable page awareness we would use it's page offsets. In the example above col A with "coarse" pages overlaps with "finer" pages in Col B that were skipped during data fetch. This lead to the invalid offsets issue. |
| /// Add offset index metadata for each column in a row group to this `ReadPlanBuilder` | ||
| pub fn with_offset_index_metadata( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the offsets of the column with the smallest number of rows per page should prevent the invalid offset issue from happening.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I came up with a counter example where taking offsets from the col with finest offsets doesn't work.
┏━━━━┓ ┌────────┐ ┌────────┐
- '1' means selected ┃ 0 ┃ │ Row 0 │ │ Row 0 │
- '0' means filtered ┃ 0 ┃ │ Row 1 │ A Page 0 │ Row 1 │
┃ 0 ┃ │ Row 2 │ (skipped) │ Row 2 │
┃ ┃ └────────┘ │ Row 3 │ B Page 0
┃ 1 ┃ ┌────────┐ └────────┘
┃ 1 ┃ │ Row 3 │ A Page 1 ┌────────┐
┃ 1 ┃ │ Row 4 │ (fetched) │ Row 4 │
┃ 0 ┃ │ Row 5 │ │ Row 5 │
┃ ┃ └────────┘ │ Row 6 │ B Page 1 (skipped)
┃ 0 ┃ ┌────────┐ │ Row 7 │
┃ 0 ┃ │ Row 6 │ A Page 2 └────────┘
┃ 0 ┃ │ Row 7 │ ┌────────┐
┃ 0 ┃ │ Row 8 │ │ Row 8 │ B Page 2 (skipped)
┃ ┃ └────────┘ │ Row 9 │
┗━━━━┛ └────────┘
Mask chunking uses A's finest boundary:
- At mask_start = row 3, next A page boundary = row 6
- Chunk reads rows 3–5
But Column B has 4-row pages:
- rows 0–3 in B Page 0 (fetched)
- rows 4–7 in B Page 1 (skipped)
→ rows 4–5 are in a skipped B page → invalid offset
Could go back to creating a vec of all page offsets and looking up closest page end for a mask chunk.
|
I plan to work on this PR carefully today |
|
Checking it out again |
| /// This test creates a parquet file with multiple small pages and verifies that | ||
| /// when using Mask policy, pages that are skipped entirely are handled correctly. | ||
| #[test] | ||
| fn test_bitmask_page_aware_selection() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found these tests really hard to read as they have so much boiler plate, what they are doing is obscured by all the repetitious mechanics.
I will push a suggestion to reduce the duplication
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, I thought about replacing the tests with a sync version of the #9243
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think as a follow on we should maybe split up the tests as well, as described here
| /// This test creates a parquet file with multiple small pages and verifies that | ||
| /// when using Mask policy, pages that are skipped entirely are handled correctly. | ||
| #[test] | ||
| fn test_bitmask_page_aware_selection() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think as a follow on we should maybe split up the tests as well, as described here
| /// Row ranges to be selected from the data source | ||
| row_selection_cursor: RowSelectionCursor, | ||
| /// Precomputed page boundary row indices for mask chunking | ||
| page_boundaries: Option<Arc<[usize]>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have been digging around -- and I think the page_start_boundaries which is already calculated might be what we need. I will see if I can find some way to reuse it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you (again) for this work @sdf-jkl -- this is a very non trivial piece of work.
Some thoughts after staring at this for several hours:
I can't quite convince myself this PR is correct for all cases, especially when there are multiple different distribution of pages across columns. The tests in this PR are using two int columns with the same page limit, so it isn't clear to me that the tests cover the case when data page offsets are different between columns.
I wonder if you would be willing to help write some other tests for this case? Maybe take the regression test that @erratic-pattern made and evaluate predicates on the different columns or something.
I am also thinking maybe we can move the reader tests to parquet/tests/arrow_reader/row_filter.rs try and reduce the size of new code we adding to parquet/src/arrow/arrow_reader/mod.rs
I have gotten a good start on using page_start_boundaries though I am not quite done. If I can get that to work out I was thinking I would try and pull page_start_boundaries into a struct (so it can be better documented / easier to verify it is correct -- e.g. are the column indexes before or after projection?)
So TLDR:
- I think we need some more tests for predicates with multi-column chunks
- I think we can use
page_start_boundarieswith some more finagling
I am working on 2 - if I can get that going, I'll then move on to 1 if no one beats me to it.
I will refrain from pusing more commits directly to this PR -- instead I'll make PRs into it
| // or until the mask is exhausted. This mirrors the behaviour of the legacy | ||
| // `RowSelector` queue-based iteration. | ||
| while cursor < mask.len() && selected_rows < batch_size { | ||
| let max_chunk_rows = page_boundaries |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since the boundaries are all sorted we should be able to avoid this sort/partition point...
|
Thanks, I understand, this issue got me thinking about it in my sleep. I agree that the tests with two int cols do not cover different distributions of pages. Originally, they were meant to cover the case where the mask was not page-aware at all. The new #9243 test covers that scenario and also checks different page distributions, which seemingly makes the old tests redundant. The #9243 test covers different page distributions despite also using a page limit because one col is utf8. When building the row groups, the arrow writer is smart and will use dictionary encoding on that column. This adds a dictionary page at the beginning of the col chunk and creates an offset between pages. |
This is the mark of a great software engineer, in my opinion. Just don't lose too many 💤 |
Which issue does this PR close?
Rationale for this change
Check issue.
What changes are included in this PR?
Made
next_mask_chunkpage aware.By adding
page_offsetstoParquetRecordBatchReaderAre these changes tested?
Should be covered by existing tests from #8733
Are there any user-facing changes?