Skip to content

Conversation

@wkalt
Copy link
Contributor

@wkalt wkalt commented Jan 20, 2026

Lance's read pipeline is split into independent scheduling and decoding loops, designed to maximize both IO and CPU parallelism.

When we schedule a range of rows to read, the scheduler breaks those ranges up into "scan lines" and issues requests against the storage backend at the maximum parallelism advertised by the backend. For each scan line, a message is sent over an unbounded mpsc channel to the decoder. The message includes futures that wrap the results of the column download operations.

Although requests against storage are limited by the advertised capabilities of the store, the limitation applies only to in-flight IO requests. There is nothing that stops results of completed IO requests from piling up in memory prior to decoding. This works fine when IO is slower than decoding, but if IO is significantly faster than decoding (for example, RAM or NVME-based storage, or particularly slow decoding tasks), we can eagerly accumulate a lot of downloaded data in memory a lot sooner than we need it.

This patch introduces a semaphore-based backpressure mechanism. Permits are required prior to scheduling and released after completion of the decoding futures, limiting the amount of work that can accumulate between the scheduler and the consumer.

Lance's read pipeline is split into independent scheduling and decoding
loops, designed to maximize both IO and CPU parallelism.

When we schedule a range of rows to read, the schedule breaks those
ranges up into "scan lines" and issues requests against the storage
backend at the maximum parallelism advertised by the backend. For each
scan line, a message is sent over an unbounded mpsc channel to the
decoder. The message includes futures the wrap the results of the column
download operations.

Although requests against storage are limited by the advertised
capabilities of the store, the limitation applies only to in-flight IO
requests. There is nothing that stops results of completed IO requests
from piling up in memory prior to decoding. This works fine when IO is
slower than decoding, but if IO is signficantly faster than decoding
(for example, RAM or NVME-based storage, or particularly slow decoding
tasks), we can eagerly accumulate a lot of downloaded data in memory a
lot sooner than we need it.

This patch introduces a semaphore-based backpressure mechanism. Permits
are required prior to scheduling and released after completion of the
decoding futures, limiting the amount of work that can accumulate
between the scheduler and the consumer.
@github-actions github-actions bot added the enhancement New feature or request label Jan 20, 2026
@wkalt
Copy link
Contributor Author

wkalt commented Jan 20, 2026

this is WIP/discussion state -- there are some unresolved questions.

One big question is how to make this work optimally when working against a storage-backed memory cache, which is the original reason for the experiment. The problem is, some requests will behave like object storage and some will behave like memory. We may need something smarter/more adaptive than a fixed limit on the channel size to accommodate this. It may be useful to allow the caller to bring their own semaphore, in order to keep this complexity out of lance itself.

Also, I'm not sure if the IO latency simulation in the benchmark is good enough. If we merge this we will probably want to scale the benchmark back and make it a bit more targeted.

Here are the current textual benchmark results ("unbounded" is current behavior):

=== I/O Latency: 0ms ===
  latency= 0ms p= 4 unbounded      1771346 rows/s, 423.0 MB, io_reqs=1
  latency= 0ms p= 4 bounded_c=2    4831215 rows/s,  24.4 MB, io_reqs=98
  latency= 0ms p= 4 bounded_c=4    1206624 rows/s,  48.4 MB, io_reqs=98
  latency= 0ms p= 4 bounded_c=8    1240292 rows/s,  48.4 MB, io_reqs=98
  latency= 0ms p= 4 bounded_c=16   1207817 rows/s,  48.4 MB, io_reqs=98

=== I/O Latency: 1ms ===
  latency= 1ms p= 4 unbounded      1808342 rows/s, 423.0 MB, io_reqs=1
  latency= 1ms p= 4 bounded_c=2     462761 rows/s,  12.4 MB, io_reqs=98
  latency= 1ms p= 4 bounded_c=4     460657 rows/s,  12.4 MB, io_reqs=98
  latency= 1ms p= 4 bounded_c=8     456573 rows/s,  12.4 MB, io_reqs=98
  latency= 1ms p= 4 bounded_c=16    464966 rows/s,  12.4 MB, io_reqs=98

=== I/O Latency: 5ms ===
  latency= 5ms p= 4 unbounded      1668330 rows/s, 423.0 MB, io_reqs=1
  latency= 5ms p= 4 bounded_c=2     163463 rows/s,  12.4 MB, io_reqs=98
  latency= 5ms p= 4 bounded_c=4     162007 rows/s,  12.4 MB, io_reqs=98
  latency= 5ms p= 4 bounded_c=8     162050 rows/s,  12.4 MB, io_reqs=98
  latency= 5ms p= 4 bounded_c=16    162829 rows/s,  12.4 MB, io_reqs=98

=== I/O Latency: 50ms ===
  latency=50ms p= 4 unbounded       936137 rows/s, 423.0 MB, io_reqs=1
  latency=50ms p= 4 bounded_c=2      19863 rows/s,  12.4 MB, io_reqs=98
  latency=50ms p= 4 bounded_c=4      19869 rows/s,  12.4 MB, io_reqs=98
  latency=50ms p= 4 bounded_c=8      19873 rows/s,  12.4 MB, io_reqs=98
  latency=50ms p= 4 bounded_c=16     19872 rows/s,  12.4 MB, io_reqs=98

I validated the effectiveness in my real application's memory cache but as mentioned I think more work is required to cover the cold data case.

@wkalt
Copy link
Contributor Author

wkalt commented Jan 20, 2026

another issue with this patch, is that it precludes some IO coalescing that is currently performed by the unbounded case. I think ideally we would have some way to independently control the degree of coalescing and the degree of permitting/readahead, but currently the coalescing is done in the same step (scheduling) as the storage request dispatch. I was thinking for this it could make sense to split scheduling/decoding into plan/schedule/decode or schedule/execute/decode. Then the permitting could happen between plan and schedule, which would be downstream of IO coalescing.

let next_task = ReadBatchTask {
task: task.boxed(),
num_rows: num_rows as u32,
backpressure_permits: Vec::new(), // Permits are now inside the task future
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my preference would be to just remove the tokio::spawn call up above this point. Does that solve the backpressure issue? See this change here for an example: a9efde8

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, I will check this

@westonpace
Copy link
Member

Adding on to my brief comment. There is already a backpressure mechanism in the I/O scheduler. I would like to understand why this mechanism is not working before adding a new one.

In theory the scheduling thread submits requests to the scheduler as you have described. These requests first try and grab a reservation. There are two throttles. One throttles the # of bytes allowed in the I/O buffer per ScanScheduler. The other throttles the number of concurrent IOPS allowed (there is both a per-scheduler and process-wide pool). If no reservation could be made the request is placed in a queue.

Meanwhile, the consumer will try and pull tasks from the decoder. Once a task is retrieved from the decoder we assume the consumer has allocated resources to process that task. The reservations for the I/O utilized by the task are released. At this point we switch from the I/O pipeline (bound by the I/O buffer size) to the compute pipeline (bound by the batch size and compute parallelism).

How exactly compute parallelism is managed varies by approach. In filtered read there are two options but we always use OnePartitionMultipleThreads which has a try_buffered call to pull tasks from the decoder. It uses the number of CPUs as the concurrency. So we could be limited to num_cpus * batch_size bytes.

Is the issue that a scan scheduler is somehow exceeding it's I/O buffer size?
Do we perhaps have too many scan schedulers configured somewhere (each scheduler has its own buffer)?
Is some consumer of a scheduler using too much readahead?
Are batch sizes just too large?

@wkalt
Copy link
Contributor Author

wkalt commented Jan 21, 2026

@westonpace Thanks for taking a look. I will try your tweak in the next day or two, might be difficult to get time today.

Once a task is retrieved from the decoder we assume the consumer has allocated resources to process that task. The reservations for the I/O utilized by the task are released.

I think this is correct in a sense but the assumption creates a flaw in the backpressure mechanism. By my understanding,

  1. The "reservation" we are talking about is this bytes_avail: https://github.com/lance-format/lance/blob/main/rust/lance-io/src/scheduler.rs#L188-L191.
  2. The scheduler releases capacity by incrementing that here, in on_bytes_consumed: https://github.com/lance-format/lance/blob/main/rust/lance-io/src/scheduler.rs#L188-L191
  3. We call on_bytes_consumed here: https://github.com/lance-format/lance/blob/main/rust/lance-io/src/scheduler.rs#L771. This future gets executed lazily, in the next step.
  4. We pick that data up here: https://github.com/lance-format/lance/blob/main/rust/lance-encoding/src/encodings/logical/primitive.rs#L820. We await the future from step 3, returning the bytes reservation.
  5. We go on to decode the data.

The flaw is that at step 4, the bytes have not really been consumed. They have been copied into application memory, and they are about to get decoded -- which may require additional copies, allocations, and overheads. While that happens we can keep repeating the steps, copying more data into memory.

That means the current bytes_avail mechanism is an effective gate on IO request bytes in flight, but it is not effective at protecting us from IO outpacing the consumer (which I think is the real intent).

edit: Regarding compute parallelism being a consumer limit, I wonder if it is the case that the producer limit (io buffer size) is global, while the consumer limits are per-request. In my problematic scenario, I have a lot of concurrent requests. Intra-request limits on compute parallelism won't limit the global accumulation of buffers after step 4.

@westonpace
Copy link
Member

The flaw is that at step 4, the bytes have not really been consumed. They have been copied into application memory, and they are about to get decoded -- which may require additional copies, allocations, and overheads. While that happens we can keep repeating the steps, copying more data into memory.

True, they have moved from the I/O pipeline into the CPU pipeline. Still, if the number of concurrent batches we process in the CPU pipeline is limited, then the overall RAM is still limited (io_buffer_size + num_concurrent_batches * batch_size_bytes) right?

Otherwise the question of "when should I return the bytes" can get quite tricky. We could do reference counting on the buffers but what if I run an in-memory filter that removes 1 row. This will rewrite all the buffers but they will still utilize the same amount of space. Now I'm back to io_buffer_size + num_concurrent_batches * batch_size_bytes. I think trying to tie the compute engine RAM usage with the Scanner RAM usage is probably a difficult task.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants