Skip to content

Track peak_mem_used in ExternalSorter #16192

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

ding-young
Copy link
Contributor

@ding-young ding-young commented May 27, 2025

Which issue does this PR close?

Questions & Thoughts

  • There are other paths that does not build ExternalSorter when execute() is called in SortExec. i.e, TopK. Do we need to track peak_mem_used in these paths, too?
  • Do we need to include merge_reservation in peak_mem_used? Since it serves as a headroom value and gets freed before in_mem_sort_stream requests an allocation, so I just updated the peak_mem_used only with reservation, not merge_reservation.
  • Maybe it would be better to call human_readable_bytes when displaying this metric.

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label May 27, 2025
Copy link
Contributor

@2010YOUY01 2010YOUY01 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • There are other paths that does not build ExternalSorter when execute() is called in SortExec. i.e, TopK. Do we need to track peak_mem_used in these paths, too?

Yes, I believe other than ExternalSorter, only TopK path will buffer intermediate data, let's create an issue for this task.

  • Do we need to include merge_reservation in peak_mem_used? Since it serves as a headroom value and gets freed before in_mem_sort_stream requests an allocation, so I just updated the peak_mem_used only with reservation, not merge_reservation.

I agree that merge_reservation should not be included, since it's just a placeholder instead of the actual memory used. However I think we should also count the memory used in the later merge phase. The reason is: SortExec will execute in 2 stages, the 1st stage will be using ExternalSorter, and the second stage is SortPreservingMerge. SPM stage will generate new data (converted row format to accelerate value comparison) and we want to report the maximum memory usage across its entire life cycle, here is an example:
Let's say we have a SortExec (with 1 partition) to handle 10 input batches, the execution process is:

  1. Reserve 10MB for merge_reservation() // peak_mem -- 0M
  2. Read input and buffer 10 batches (5MB each) // peak_mem -- 50M
  3. SPM step (each batch's converted row format is 2MB) // peak_mem -- 70M
  • Maybe it would be better to call human_readable_bytes when displaying this metric.

This is a good idea, perhaps we can do it in a separate PR and change it for all operators with peak_memory_used metrics.

To summarize, I suggest to change the implementation to also count the memory used in SPM phase, also add some tests, and open issues for other TODOs.

@@ -658,6 +664,8 @@ impl ExternalSorter {
self.reservation
.try_resize(get_reserved_byte_for_record_batch(&batch))
.map_err(Self::err_with_oom_context)?;
// TODO(ding-young) can reservation grow here?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. After concatenating small batches into a single large batch, it will allocate new buffers and copy payloads into it, so the size might change.

@ding-young
Copy link
Contributor Author

@2010YOUY01
Hi, I’ve been struggling a bit with tracking peak memory in SPM step, and I was wondering if I could ask for some help.

1. Can we add the memory for converted (row) batches to previous peak_mem_used?

Since ExternalSorter creates SortPreservingMergeStream for 2nd step, SPM, so I tried updating the peak memory metric inside maybe_poll_stream in SortPreservingMergeStream (which internally calls poll_next where convert_batch is done, and pushes batches into a BatchBuilder).
But here’s my concern: if we keep adding the new reservation from this second step to the previous peak memory value, we might be overestimating. That’s because by the time the second step runs, some batches from the first step might have already been dropped. So, summing them might inflate the reported peak memory.
I tried printing the total reserved size from the global memory pool manually (with tons of println) during execution, and it seems like there was a difference between the first and second steps, but it didn’t seem as large as the total size of all converted batches combined.

2. Parent Operator's memory reservation

Also, when the parent operator (e.g., SortPreservingMergeExec) executes, the reservation created by the earlier SortExec is not yet released. In this case, should SortPreservingMergeExec only track the peak memory of its own reservation?

And please let me know if I’ve misunderstood when the reservation is supposed to be dropped. Maybe that’s where my confusion is coming from.

@2010YOUY01
Copy link
Contributor

@ding-young I also think the code to manage reservation + merge_reservation is tricky. I'll try to answer your questions by adding more doc about memory reservation management inside SortExec this week. Let's move on to other tasks for now.

@2010YOUY01
Copy link
Contributor

I added some background docs #16289, but gotta head out now — I’ll get back to the ExternalSorter part and address your questions in the next few days.

@ding-young
Copy link
Contributor Author

@2010YOUY01 Thanks for your help! I’m currently working on a different issue (spill file compression option) meanwhile. Feel free to ping me if you'd like me to clarify any of the questions or if anything else comes up.

@2010YOUY01
Copy link
Contributor

@2010YOUY01 Hi, I’ve been struggling a bit with tracking peak memory in SPM step, and I was wondering if I could ask for some help.

1. Can we add the memory for converted (row) batches to previous peak_mem_used?

Since ExternalSorter creates SortPreservingMergeStream for 2nd step, SPM, so I tried updating the peak memory metric inside maybe_poll_stream in SortPreservingMergeStream (which internally calls poll_next where convert_batch is done, and pushes batches into a BatchBuilder). But here’s my concern: if we keep adding the new reservation from this second step to the previous peak memory value, we might be overestimating. That’s because by the time the second step runs, some batches from the first step might have already been dropped. So, summing them might inflate the reported peak memory.

I think the correct way to implement it is for each input batch, do peak_mem = max(peak_mem, sum_all_related_reservations_current_size()), it should not have any overestimate, otherwise this should be a bug.

I tried printing the total reserved size from the global memory pool manually (with tons of println) during execution, and it seems like there was a difference between the first and second steps, but it didn’t seem as large as the total size of all converted batches combined.

The reason might be we're currently using a hack: when buffering batches use 2X memory size for reservation, so that the SPM step won't run out of memory.

fn get_reserved_byte_for_record_batch(batch: &RecordBatch) -> usize {

To correct my previous example:

Let's say we have a SortExec (with 1 partition) to handle 10 input batches, the execution process is:

Reserve 10MB for merge_reservation() // peak_mem -- 0M

Read input and buffer 10 batches (5MB each)
Here we use 2X estimate so that later SPM step will have enough extra memory for `Row`s // peak_mem -- 100M

SPM step (each batch's converted row format is 2MB) // peak_mem -- 70M

2. Parent Operator's memory reservation

Also, when the parent operator (e.g., SortPreservingMergeExec) executes, the reservation created by the earlier SortExec is not yet released. In this case, should SortPreservingMergeExec only track the peak memory of its own reservation?

I think the buffering step will always temporary drop the reservation first, and later let SPM reserve that back again 🤔
Here is a example that temporarily drop the reservation in sort-buffering step


If you notice anything not released it might be a bug.

Due to this 2x estimation hack in the implementation, I now think tracking peak memory in the SPM step is unnecessary. The current implementation, with more tests, should be good to go.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
physical-plan Changes to the physical-plan crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants