Commit 04f217b
Speed up Parquet filter pushdown v4 (Predicate evaluation cache for async_reader) (apache#7850)
This is my latest attempt to make pushdown faster. Prior art: apache#6921
cc @alamb @zhuqi-lucas
- Part of apache#8000
- Related to apache/datafusion#3463
- Related to apache#7456
- Closes apache#7363
- Closes apache#8003
## Problems of apache#6921
1. It proactively loads entire row group into memory. (rather than only
loading pages that passing the filter predicate)
2. It only cache decompressed pages, still paying the decoding cost
twice.
This PR takes a different approach, it does not change the decoding
pipeline, so we avoid the problem 1. It also caches the arrow record
batch, so avoid problem 2.
But this means we need to use more memory to cache data.
## How it works?
1. It instruments the `array_readers` with a transparent
`cached_array_reader`.
2. The cache layer will first consult the `RowGroupCache` to look for a
batch, and only reads from underlying reader on a cache miss.
3. There're cache producer and cache consumer. Producer is when we build
filters we insert arrow arrays into cache, consumer is when we build
outputs, we remove arrow array from cache. So the memory usage should
look like this:
```
▲
│ ╭─╮
│ ╱ ╲
│ ╱ ╲
│ ╱ ╲
│ ╱ ╲
│╱ ╲
└─────────────╲──────► Time
│ │ │
Filter Peak Consume
Phase (Built) (Decrease)
```
In a concurrent setup, not all reader may reach the peak point at the
same time, so the peak system memory usage might be lower.
4. It has a max_cache_size knob, this is a per row group setting. If the
row group has used up the budget, the cache stops taking new data. and
the `cached_array_reader` will fallback to read and decode from Parquet.
## Other benefits
1. This architecture allows nested columns (but not implemented in this
pr), i.e., it's future proof.
2. There're many performance optimizations to further squeeze the
performance, but even with current state, it has no regressions.
## How does it perform?
My criterion somehow won't produces a result from `--save-baseline`, so
I asked llm to generate a table from this benchmark:
```
cargo bench --bench arrow_reader_clickbench --features "arrow async" "async"
```
`Baseline` is the implementation for current main branch.
`New Unlimited` is the new pushdown with unlimited memory budget.
`New 100MB` is the new pushdown but the memory budget for a row group
caching is 100MB.
```
Query | Baseline (ms) | New Unlimited (ms) | Diff (ms) | New 100MB (ms) | Diff (ms)
-------+--------------+--------------------+-----------+----------------+-----------
Q1 | 0.847 | 0.803 | -0.044 | 0.812 | -0.035
Q10 | 4.060 | 6.273 | +2.213 | 6.216 | +2.156
Q11 | 5.088 | 7.152 | +2.064 | 7.193 | +2.105
Q12 | 18.485 | 14.937 | -3.548 | 14.904 | -3.581
Q13 | 24.859 | 21.908 | -2.951 | 21.705 | -3.154
Q14 | 23.994 | 20.691 | -3.303 | 20.467 | -3.527
Q19 | 1.894 | 1.980 | +0.086 | 1.996 | +0.102
Q20 | 90.325 | 64.689 | -25.636 | 74.478 | -15.847
Q21 | 106.610 | 74.766 | -31.844 | 99.557 | -7.053
Q22 | 232.730 | 101.660 | -131.070 | 204.800 | -27.930
Q23 | 222.800 | 186.320 | -36.480 | 186.590 | -36.210
Q24 | 24.840 | 19.762 | -5.078 | 19.908 | -4.932
Q27 | 80.463 | 47.118 | -33.345 | 49.597 | -30.866
Q28 | 78.999 | 47.583 | -31.416 | 51.432 | -27.567
Q30 | 28.587 | 28.710 | +0.123 | 28.926 | +0.339
Q36 | 80.157 | 57.954 | -22.203 | 58.012 | -22.145
Q37 | 46.962 | 45.901 | -1.061 | 45.386 | -1.576
Q38 | 16.324 | 16.492 | +0.168 | 16.522 | +0.198
Q39 | 20.754 | 20.734 | -0.020 | 20.648 | -0.106
Q40 | 22.554 | 21.707 | -0.847 | 21.995 | -0.559
Q41 | 16.430 | 16.391 | -0.039 | 16.581 | +0.151
Q42 | 6.045 | 6.157 | +0.112 | 6.120 | +0.075
```
1. If we consider the diff within 5ms to be noise, then we are never
worse than the current implementation.
2. We see significant improvements for string-heavy queries, because
string columns are large, they take time to decompress and decode.
3. 100MB cache budget seems to have small performance impact.
## Limitations
1. It only works for async readers, because sync reader do not follow
the same row group by row group structure.
2. It is memory hungry -- compared to apache#6921. But changing decoding
pipeline without eager loading entire row group would require
significant changes to the current decoding infrastructure, e.g., we
need to make page iterator an async function.
3. It currently doesn't support nested columns, more specifically, it
doesn't support nested columns with nullable parents. but supporting it
is straightforward, no big changes.
4. The current memory accounting is not accurate, it will overestimate
the memory usage, especially when reading string view arrays, where
multiple string view may share the same underlying buffer, and that
buffer size is counted twice. Anyway, we never exceeds the user
configured memory usage.
5. If one row passes the filter, the entire batch will be cached. We can
probably optimize this though.
## Next steps?
This pr is largely proof of concept, I want to collect some feedback
before sending a multi-thousands pr :)
Some items I can think of:
1. Design an interface for user to specify the cache size limit,
currently it's hard-coded.
2. Don't instrument nested array reader if the parquet file has nullable
parent. currently it will panic
3. More testing, and integration test/benchmark with Datafusion
---------
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>1 parent 4a21443 commit 04f217b
File tree
12 files changed
+1869
-29
lines changed- parquet
- src/arrow
- array_reader
- arrow_reader
- async_reader
- tests/arrow_reader
12 files changed
+1869
-29
lines changed| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
15 | 15 | | |
16 | 16 | | |
17 | 17 | | |
18 | | - | |
| 18 | + | |
19 | 19 | | |
20 | 20 | | |
21 | 21 | | |
22 | 22 | | |
| 23 | + | |
| 24 | + | |
23 | 25 | | |
24 | 26 | | |
| 27 | + | |
25 | 28 | | |
26 | 29 | | |
27 | 30 | | |
28 | 31 | | |
29 | 32 | | |
| 33 | + | |
30 | 34 | | |
31 | 35 | | |
32 | 36 | | |
33 | 37 | | |
34 | 38 | | |
35 | 39 | | |
36 | 40 | | |
| 41 | + | |
| 42 | + | |
| 43 | + | |
| 44 | + | |
| 45 | + | |
| 46 | + | |
| 47 | + | |
| 48 | + | |
| 49 | + | |
| 50 | + | |
| 51 | + | |
| 52 | + | |
| 53 | + | |
| 54 | + | |
| 55 | + | |
| 56 | + | |
| 57 | + | |
| 58 | + | |
| 59 | + | |
| 60 | + | |
| 61 | + | |
| 62 | + | |
| 63 | + | |
| 64 | + | |
| 65 | + | |
| 66 | + | |
| 67 | + | |
| 68 | + | |
| 69 | + | |
| 70 | + | |
| 71 | + | |
| 72 | + | |
| 73 | + | |
| 74 | + | |
| 75 | + | |
| 76 | + | |
| 77 | + | |
| 78 | + | |
| 79 | + | |
| 80 | + | |
| 81 | + | |
| 82 | + | |
| 83 | + | |
| 84 | + | |
| 85 | + | |
37 | 86 | | |
38 | 87 | | |
| 88 | + | |
39 | 89 | | |
| 90 | + | |
| 91 | + | |
| 92 | + | |
| 93 | + | |
40 | 94 | | |
41 | 95 | | |
42 | 96 | | |
43 | | - | |
44 | | - | |
| 97 | + | |
| 98 | + | |
| 99 | + | |
| 100 | + | |
| 101 | + | |
| 102 | + | |
| 103 | + | |
| 104 | + | |
| 105 | + | |
| 106 | + | |
| 107 | + | |
| 108 | + | |
45 | 109 | | |
46 | 110 | | |
47 | 111 | | |
| |||
69 | 133 | | |
70 | 134 | | |
71 | 135 | | |
72 | | - | |
| 136 | + | |
| 137 | + | |
| 138 | + | |
| 139 | + | |
| 140 | + | |
| 141 | + | |
| 142 | + | |
| 143 | + | |
| 144 | + | |
| 145 | + | |
| 146 | + | |
| 147 | + | |
| 148 | + | |
| 149 | + | |
| 150 | + | |
| 151 | + | |
| 152 | + | |
| 153 | + | |
| 154 | + | |
| 155 | + | |
73 | 156 | | |
74 | 157 | | |
75 | 158 | | |
| |||
375 | 458 | | |
376 | 459 | | |
377 | 460 | | |
378 | | - | |
| 461 | + | |
| 462 | + | |
379 | 463 | | |
380 | 464 | | |
381 | 465 | | |
| |||
0 commit comments