-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Track peak_mem_used in ExternalSorter #16192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- There are other paths that does not build
ExternalSorter
whenexecute()
is called inSortExec
. i.e, TopK. Do we need to trackpeak_mem_used
in these paths, too?
Yes, I believe other than ExternalSorter
, only TopK
path will buffer intermediate data, let's create an issue for this task.
- Do we need to include
merge_reservation
inpeak_mem_used
? Since it serves as a headroom value and gets freed beforein_mem_sort_stream
requests an allocation, so I just updated thepeak_mem_used
only withreservation
, notmerge_reservation
.
I agree that merge_reservation
should not be included, since it's just a placeholder instead of the actual memory used. However I think we should also count the memory used in the later merge phase. The reason is: SortExec
will execute in 2 stages, the 1st stage will be using ExternalSorter
, and the second stage is SortPreservingMerge
. SPM stage will generate new data (converted row format to accelerate value comparison) and we want to report the maximum memory usage across its entire life cycle, here is an example:
Let's say we have a SortExec (with 1 partition) to handle 10 input batches, the execution process is:
- Reserve 10MB for
merge_reservation()
// peak_mem -- 0M - Read input and buffer 10 batches (5MB each) // peak_mem -- 50M
- SPM step (each batch's converted row format is 2MB) // peak_mem -- 70M
- Maybe it would be better to call
human_readable_bytes
when displaying this metric.
This is a good idea, perhaps we can do it in a separate PR and change it for all operators with peak_memory_used
metrics.
To summarize, I suggest to change the implementation to also count the memory used in SPM phase, also add some tests, and open issues for other TODOs.
@@ -658,6 +664,8 @@ impl ExternalSorter { | |||
self.reservation | |||
.try_resize(get_reserved_byte_for_record_batch(&batch)) | |||
.map_err(Self::err_with_oom_context)?; | |||
// TODO(ding-young) can reservation grow here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so. After concatenating small batches into a single large batch, it will allocate new buffers and copy payloads into it, so the size might change.
@2010YOUY01 1. Can we add the memory for converted (row) batches to previous
|
@ding-young I also think the code to manage |
I added some background docs #16289, but gotta head out now — I’ll get back to the |
@2010YOUY01 Thanks for your help! I’m currently working on a different issue (spill file compression option) meanwhile. Feel free to ping me if you'd like me to clarify any of the questions or if anything else comes up. |
I think the correct way to implement it is for each input batch, do
The reason might be we're currently using a hack: when buffering batches use 2X memory size for reservation, so that the SPM step won't run out of memory.
To correct my previous example:
I think the buffering step will always temporary drop the reservation first, and later let
If you notice anything not released it might be a bug. Due to this 2x estimation hack in the implementation, I now think tracking peak memory in the SPM step is unnecessary. The current implementation, with more tests, should be good to go. |
Which issue does this PR close?
SortExec
#16042Questions & Thoughts
ExternalSorter
whenexecute()
is called inSortExec
. i.e, TopK. Do we need to trackpeak_mem_used
in these paths, too?merge_reservation
inpeak_mem_used
? Since it serves as a headroom value and gets freed beforein_mem_sort_stream
requests an allocation, so I just updated thepeak_mem_used
only withreservation
, notmerge_reservation
.human_readable_bytes
when displaying this metric.Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?