Skip to main content

2 posts tagged with "operator"

View All Tags

Why Sort is row-based in Velox — A Quantitative Assessment

· 8 min read
Meng Duan (macduan)
Software Engineer @ ByteDance
Xiaoxuan Meng
Software Engineer @ Meta

TL;DR

Velox is a fully vectorized execution engine[1]. Its internal columnar memory layout enhances cache locality, exposes more inter-instruction parallelism to CPUs, and enables the use of SIMD instructions, significantly accelerating large-scale query processing.

However, some operators in Velox utilize a hybrid layout, where datasets can be temporarily converted to a row-oriented format. The OrderBy operator is one example, where our implementation first materializes the input vectors into rows, containing both sort keys and payload columns, sorts them, and converts the rows back to vectors.

In this article, we explain the rationale behind this design decision and provide experimental evidence for its implementation. We show a prototype of a hybrid sorting strategy that materializes only the sort-key columns, reducing the overhead of materializing payload columns. Contrary to expectations, the end-to-end performance did not improve—in fact, it was even up to slower. We present the two variants and discuss why one is counter-intuitively faster than the other.

Row-based vs. Non-Materialized

Row-based Sort

The OrderBy operator in Velox’s current implementation uses a utility called SortBuffer to perform the sorting, which consists of three stages:

  1. Input Stage: Serializes input Columnar Vectors into a row format, stored in a RowContainer.
  2. Sort Stage: Sorts based on keys within the RowContainer.
  3. Output Stage: Extract output vectors column by column from the RowContainer in sorted order.

While row-based sorting is more efficient than column-based sorting[2,3], what if we only materialize the sort key columns? We could then use the resulting sort indices to gather the payload data into the output vectors directly. This would save the cost of converting the payload columns to rows and back again. More importantly, it would allow us to spill the original vectors directly to disk rather than first converting rows back into vectors for spilling.

Non-Materialized Sort

We have implemented a non-materializing sort strategy designed to improve sorting performance. The approach materializes only the sort key columns and their original vector indices, which are then used to gather the corresponding rows from the original input vectors into the output vector after the sort completes. It changes the SortBuffer to NonMaterizedSortBuffer, which consists of three stages:

  1. Input Stage: Holds the input vector (its shared pointer) in a list, serializes key columns and additional index columns (VectorIndex and RowIndex) into rows, stored in a RowContainer.
  2. Sort Stage: Sorts based on keys within the RowContainer.
  3. Output Stage: Extracts the VectorIndex and RowIndex columns, uses them together to gather the corresponding rows from the original input vectors into the output vector.

In theory, this should have significantly reduced the overhead of materializing payload columns, especially for wide tables, since only sorting keys are materialized. However, the benchmark results were the exact opposite of our expectations. Despite successfully eliminating expensive serialization overhead and reducing the total instruction the end-to-end performance was 3x times slower.

Benchmark Result

To validate the effectiveness of our new strategy, we designed a benchmark with a varying number of payload columns:

  • Inputs: 1000 Input Vectors, 4096 rows per vector.
  • Number of payload columns: 64, 128, 256.
  • L2 cache: 80 MiB, L3 cache: 108 MiB.
numPayloadColumnsModeInput TimeSorting TimeOutput TimeTotal TimeDesc
64Row-based4.27s0.79s4.23s11.64sRow-based is 3.9x faster
Columnar0.28s0.84s42.30s45.90s
128Row-based20.25s1.11s5.49s31.43sRow-based is 2.0x faster
Columnar0.27s0.51s59.15s64.20s
256Rows-based29.34s1.02s12.85s51.48sRow-based is 3.0x faster
Columnar0.87s1.10s144.00s154.80s

The benchmark results confirm that Row-based Sort is the superior strategy, delivering a 1.9x to 3.9x overall speedup compared to Columnar Sort. While Row-based Sort incurs a significantly higher upfront cost during the Input phase (peaking at 104s), it maintains a highly stable and efficient Output phase (maximum 32s). In contrast, Columnar Sort suffers from severe performance degradation in the Output phase as the payload increases, with execution times surging from 42s to 283s, resulting in a much slower total execution time despite its negligible input overhead.

To identify the root cause of the performance divergence, we utilized perf stat to analyze micro-architectural efficiency and perf mem to profile memory access patterns during the critical Output phase.

MetricsRow-basedColumnarDesc
Total Instructions555.6 Billion475.6 BillionRow +17%
IPC (Instructions Per Cycle)2.40.82Row 2.9x Higher
LLC Load Misses (Last Level Cache)0.14 Billion5.01 BillionColumnar 35x Higher
Memory LevelRow-based OutputColumnar Outputs
RAM Hit5.8%38.1%
LFB Hit1.7%18.9%
RAM Hit5.8%38.1%

The results reveal a stark contrast in CPU utilization. Although the Row-based approach executes 17% more instructions (due to serialization overhead), it maintains a high IPC of 2.4, indicating a fully utilized pipeline. In contrast, the Columnar approach suffers from a low IPC of 0.82, meaning the CPU is stalled for the majority of cycles. This is directly driven by the 35x difference in LLC Load Misses, which forces the Columnar implementation to fetch data from main memory repeatedly. The memory profile further confirms this bottleneck: Columnar mode is severely latency-bound, spending 38.1% of its execution time waiting for DRAM (RAM Hit) and experiencing significant congestion in the Line Fill Buffer (18.9% LFB Hit), while Row-based mode effectively utilizes the cache hierarchy.

The Memory Access Pattern

Why does the non-materializing sort, specifically its gather method, cause so many cache misses? The answer lies in its memory access pattern. Since Velox is a columnar engine, the output is constructed column by column. For each column in an output vector, the gather process does the following:

  1. It iterates through all rows of the current output vector.
  2. For each row, locate the corresponding input vector via the sorted vector index.
  3. Locates the source row in the corresponding input vector.
  4. Copies the data from that single source cell to the target cell.

The sorted indices, by nature, offer low predictability. This forces the gather operation for a single output column to jump unpredictably across as many as different input vectors, fetching just one value from each. This random access pattern has two devastating consequences for performance.

First, at the micro-level, every single data read becomes a "long-distance" memory jump. The CPU's hardware prefetcher is rendered completely ineffective by this chaotic access pattern, resulting in almost every lookup yielding a cache miss.

Second, at the macro-level, the problem compounds with each column processed. The sheer volume of data touched potentially exceeds the size of the L3 cache. This ensures that by the time we start processing the next payload column, the necessary vectors have already been evicted from the cache. Consequently, the gather process must re-fetch the same vector metadata and data from main memory over and over again for each of the 256 payload columns. This results in 256 passes of cache-thrashing, random memory access, leading to a catastrophic number of cache misses and explaining the severe performance degradation.

In contrast, Velox’s current row-based approach serializes all input vectors into rows, with each allocation producing a contiguous buffer that holds a subset of those rows. Despite the serialization, the row layout preserves strong locality when materializing output vectors: once rows are in the cache, they can be used to extract multiple output columns. This leads to much better cache-line utilization and fewer cache misses than a columnar layout, where each fetched line often yields only a single value per column. Moreover, the largely sequential scans over contiguous buffers let the hardware prefetcher operate effectively, boosting throughput even in the presence of serialization overhead.

Conclusion

This study reinforces the core principle of performance engineering: Hardware Sympathy. Without understanding the characteristics of the memory hierarchy and optimizing for it, simply reducing the instruction count usually does not guarantee better performance.

Reference

Multi-Round Lazy Start Merge

· 6 min read
Meng Duan (macduan)
Software Engineer @ ByteDance
Xiaoxuan Meng
Software Engineer @ Meta
Pedro Pedreira
Software Engineer @ Meta

Background

Efficiently merging sorted data partitions at scale is crucial for a variety of training data preparation workloads, especially for Generative Recommenders (GRs) a new paradigm introduced in the paper Actions Speak Louder than Words: Trillion-Parameter Sequential Transducers for Generative Recommendations. A key requirement is to merge training data across partitions—for example, merging hourly partitions into daily ones—while ensuring that all rows sharing the same primary key are stored consecutively. Training data is typically partitioned and bucketed by primary key, with rows sharing the same key stored consecutively, so merging across partitions essentially becomes a multi-way merge problem.

Normally, Apache Spark can be used for this sort-merge requirement — for example, via CLUSTER BY. However, training datasets for a single job can often reach the PB scale, which in turn generates shuffle data at PB scale. Although we typically apply bucketing and ordering by key when preparing training data in production, Spark can eliminate the shuffle when merging training data from multiple hourly partitions. However, each Spark task can only read the files planned from various partitions within a split sequentially, placing them into the sorter and spilling as needed. Only after all files have been read does Spark perform a sort-merge of the spilled files. This process produces a large number of small spill files, which further degrades efficiency.

Moreover, Spark’s spill is row-based with a low compression ratio, resulting in approximately 4 times amplification compared to the original columnar training data in the data lake. These factors significantly degrade task stability and performance. Velox has a LocalMerge operator that can be introduced into Apache Spark via Gluten or PySpark on Velox.

Note: To keep the focus on merging, the remainder of this article also assumes that each partition’s training data is already sorted by primary key—a common setup in training data pipelines.

LocalMerge Operator

The LocalMerge operator consolidates its sources’ outputs into a single, sorted stream of rows. It runs single-threaded, while its upstream sources may run multi-threaded within the same task, producing multiple sorted inputs concurrently. For example, when merging 24 hourly partitions into a single daily partition (as shown in the figure below), the merge plan fragment is split into two pipelines:

  • Pipeline 0: contains two operators, TableScan and CallbackSink. 24 drivers are instantiated to scan the 24 hourly partitions.
  • Pipeline 1: contains only a single operator, LocalMerge, with one driver responsible for performing the sort merge.

A CallbackSink operator is installed at the end of each driver in Pipeline 0. It pushes the TableScan operator’s output vectors into the queues backing the merge streams. Inside LocalMerge, a TreeOfLosers performs a k-way merge over the 24 merge streams supplied by the Pipeline 0 drivers, producing a single, globally sorted output stream.

Multi-Round Spill Merge

Although LocalMerge minimizes comparisons during merging, preserves row-ordering guarantees, and cleanly isolates the single-threaded merge from the multi-threaded scan phase for predictable performance, it can cause substantial memory pressure—particularly in training-data pipelines. In these workloads, extremely wide tables are common, and even after column pruning, thousands of columns may remain.

Moreover, training data is typically stored in PAX-style formats such as Parquet, ORC, or DRWF. Using Parquet as an example, the reader often needs to keep at least one page per column in memory. As a result, simply opening a Parquet file with thousands of columns can consume significant memory even before any merging occurs. Wide schemas further amplify per-column metadata, dictionary pages, and decompression buffers, inflating the overall footprint. In addition, the k-way merge must hold input vectors from multiple sources concurrently, which drives peak memory usage even higher.

To cap memory usage and avoid OOM when merging a large number of partitions, we extend LocalMerge to process fewer local sources at a time, leverage existing spill facilities to persist intermediate results, and introduce lazy-start activation for merge inputs. Using the case of merging 24 hourly partitions into a single daily partition, the process is organized into two phases:

Phase 1

  1. Break the scan-and-merge into multiple rounds (e.g., 3 rounds).
  2. In each round, lazily start a limited number of drivers (e.g., drivers 0–7, eight at a time).
  3. The started drivers scan data and push it into the queues backing their respective merge streams.
  4. Perform an in-memory k-way merge and spill the results, producing a spill-file group (one or more spill files per group).
  5. After all inputs from drivers 0–7 are consumed and spilled, the drivers will be closed, and close the file streams opened by their TableScan operators, and release associated memory.
  6. Repeat the above steps for the remaining rounds (drivers 8–15, then drivers 16–23), ensuring peak memory stays within budget.

Phase 2

  1. Create a concatenated file stream for each spill-file group produced in Phase 1.
  2. Schedule one async callback for each concatenated stream to prefetch and push data into a merge stream.
  3. Merge the outputs of the three merge streams using a k-way merge (e.g., a loser-tree), and begin streaming the final, globally sorted results to downstream operators.
  4. The output batch rows is limited adaptively by estimating row size from the merge streams which use the averaged row size from the first batch.

How To Use

Set local_merge_spill_enabled to true to enable spilling for the LocalMerge operator (it is false by default). Then, set local_merge_max_num_merge_sources to control the number of merge sources per round according to your memory management strategy.

Note: An executor must be configured for spilling, as it would schedule an asynchronous callback for each concatenated stream to prefetch data and push it into the merge stream.

Future Work

The number of merge sources is adjusted dynamically based on available memory, rather than being determined by the local_merge_max_num_merge_sources parameter. The process starts with a small number of sources, such as 2, and incrementally increases this number for subsequent rounds (e.g., to 4) as long as sufficient memory is available. The number of sources stops increasing once it reaches a memory-constrained limit.

Acknowledgements

Thanks to Xiaoxuan Meng and Pedro Pederia for their guidance, review, and brainstorming. I also appreciate the excellent collaboration and work from my colleagues, Xiang Yao, Gang Wang, and Weixin Xu.