Bringing the Instructions To The Data

Why? What?

Since leaving Old Job I've been missing interesting database work.

One paper that has been in the back of my mind is for a few years is Efficiently Compiling Efficient Query Plans for Modern Hardware1 which details how Tableau's internal engine, HyPer, compiles SQL statements into LLVM IR. I also recently learned of the LLVM IR library Inkwell(s). Inkwell allows a mostly safe way to build LLVM IR through a typed Rust interface, JIT compile it, and execute it. Seems like these would go perfectly hand-in-hand.

General caveats is that I am not a Rust or LLVM IR expert by any means.

The curious can skip to to the final LLVM IR sum2 and the sum with division3 in the references below. Inkwell examples will be sprinkled throughout.

We will use the NYC Taxi dataset from 2024-099 loaded in the Arrow format and operate on the first RecordBatch. This RecordBatch contains 131072 values of f64s so we're looking at ~1MB of data to start out.

Here is the example object that we'll focus on:

CREATE TABLE taxi_data(
    fare FLOAT64,
    tip FLOAT64);

And here is the query:

SELECT SUM(tip / fare)
FROM taxi_data;

The query doesn't make a ton of sense, but I haven't implemented AVG yet so SUM will have to do. We mainly use this because SUM(tip + fare) can be decomposed to SUM(tip) + SUM(fare) which isn't all that interesting in comparison.

To start, lets consider the "naive" way a query planner would compile a query. Some create a tree of heap-allocated polymorphic operations, apply each operation to each tuple by calling the next() function, and continue until exhausted. These are often based on the Volcano optimizer, the seminal optimizer which split plans between the logical / physical plans and describes the "iterator" interface. We'll focus on the physical plan going forward.

If we were to simulate the approach mentioned above where we allocate a tree of operators on the heap, we could create two Box'd functions and apply them to each of the tuples.

/// Create an addition operation.
fn add_f64() -> Box<dyn Fn(f64, f64) -> f64> {
    Box::new(|acc, i| acc + i)
}

/// Create a division operation.
fn divide_f64() -> Box<dyn Fn(f64, f64) -> f64> {
    Box::new(|lhs, rhs| lhs / rhs)
}

Then we could apply these operators on two columns loaded in memory.

fn main() {
    // Load these from the NYC taxi data parquet file.
    let fare_values: Vec<f64> = vec![...];
    let tip_values: Vec<f64> = vec![...];
    
    for i in 0..fare_values.len() {
        tuple_result = 
            add_f64()(tuple_result, 
                divide_f64()(tip_values[i], fare_values[i]));
    }
}

That is all well and good (we skip other C++ class-based complexity like symbol table lookups and whatnot). We can run this using criterion for funsies on an older Macbook quad-core i7, 7MB L3 cache, with 32GB of memory and we'll see:

f64_taxi_tip_percent_sum/heap-allocated instructions
                        time:   [340.94 µs 343.78 µs 346.92 µs]
Found 3 outliers among 100 measurements (3.00%)
  2 (2.00%) high mild
  1 (1.00%) high severe

Which is okay, but... We. Can. Do. Better. This bringing the data to the instructions.

Hand-Written Approach

One thing that the HyPer paper mentions is the comparison to a "hand-written C" program so lets make a quick and dirty Rust program to compare. This is the first version of "bringing the instructions to the data". We aren't accessing the heap on each operator invocation.

So lets do that. Here is a simple for-loop

fn main() {
    // Load data...
    let mut hand_written: f64 = 0.0;
    for i in 0..fare_values.len() {
        hand_written = 
            hand_written + 
                (tip_values[i] / fare_values[i])
    }
}

Pretty straightforward, divide two column values by each other and add that result to an accumulator. Here are the results from criterion:

f64_taxi_tip_percent_sum/heap-allocated instructions
                        time:   [340.94 µs 343.78 µs 346.92 µs]
Found 3 outliers among 100 measurements (3.00%)
  2 (2.00%) high mild
  1 (1.00%) high severe
f64_taxi_tip_percent_sum/for loop
                        time:   [143.81 µs 145.01 µs 146.37 µs]
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild

So we're at 151 µs vs 332 µs. Not bad! A quick rewrite but not super extensible. Compiling Rust is...slow. Very slow. So even if we generated a sizable program for this, a fair amount of our execution budget would be spent on building the optimized executable.

Kicking It Up a Notch: Vectorized LLVM IR

Lets skip the middle man of rustc and go straight to the source (IR). I won't claim that I'm better than rustc or the multitude of people who have written it, BUT I know what I want to do for my very specific case.

LLVM Intermediate Representation (IR) is an extensive low level assembly-like language that implements many many target (x86, ARM, etc.) specific operations. This article by Miguel Young de la Sota is a fantastic description and helped me very much in writing this. LLVM exposes a JIT compiler for the IR which applies optimizations of its own. I have zero LLVM IR experience but I found it pretty straightforward to write after a bit of practice and find it similar to functional programming.

There will be quite a bit of example code (and a bit of hand-waving) following this but the high level interface that we want to implement is the following "physical plan." The arguments passed to these functions correspond to each vector argument and their length, as these build are based on the 4 argument C function with arguments *const double, length1: c_int, *const double, length2: c_int

self.entry_point(0, 1)
self.begin_f64_sum_array(0, 1)
self.inject_f64_vectorized_division(2, 3)
self.end_f64_sum_array()
self.return()

So we have a simple higher level query representation that we can lower into the LLVM IR.

One of the most compelling features of using LLVM IR is how easy it is to call SIMD (vector) operations to perform bulk calculations. This example snippet loads a value from a passed array into a <4 x double> 256 bit vector and performs vectorized division:

  %"divisor_vector_ptr" = getelementptr inbounds double, ptr %2, i32 %counter_phi
  %"divisor_vector_values" = load <4 x double>, ptr %"divisor_vector_ptr", align 32
  %"vectorized_division" = fdiv <4 x double> %"vector_values", %"divisor_vector_values"

We'll get into what counter_phi is in a bit.

Here is the Inkwell representation of this. It will usually be a bit more verbose, but I find that writing this in Rust lends a large amount of confidence compared to hand written experiments. Additionally for these examples, we are relying on a similar struct found in the Inkwell's README to maintain the CodeGen state and the same mechanism to execute the compiled IR. And onto the example:

// Extract function argument.
let array_ptr = function.get_nth_param(0)?.into_pointer_value();

// Load the pointer into a vector based on the current loop iteration.
let load_divisor = unsafe {
    let vector_ptr = self
        .builder
        .build_in_bounds_gep(
            f64_type,
            array_ptr,
            &[phi_counter.as_basic_value().into_int_value()],
            "divisor_vector_ptr",
        )
        .unwrap();

    // Explicitly load vector values
    self.builder
        .build_load(vector_type, vector_ptr, "divisor_vector_values")
        .unwrap()
};

// Perform division.
let division_result_vector = self
    .builder
    .build_float_div(
        current_vector,
        load_divisor.into_vector_value(),
        "vectorized_division"
    )
    .unwrap();

So that is vectorized division.

In the original paper the looping is glossed over a bit.

A Quick Aside on LLVM IR Loops

As I learned about The IR I learned how to write loops. We have to reach for a PHI, whose value is determined on the predecessor block and it is used to track long term values. And much like "regular" assembly LLVM IR relies on branches to go to labeled blocks.

entry:
  br label %loop

Here we define the entrypoint to the program as an unconditional branch into the loop. Here is the corresponding Inkwell:

let entry_block = self.context.append_basic_block(function, "entry");
let loop_block = self.context.append_basic_block(function, "loop");

self.builder.position_at_end(entry_block);

self.builder.build_unconditional_branch(loop_block).unwrap();
self.builder.position_at_end(loop_block);

Then we define the PHI node:

loop:                                             ; preds = %loop, %entry
  %counter_phi = phi i32 [ 0, %entry ], [ %next_counter, %loop ]
  %loop_cond = icmp slt i32 %counter_phi, %1
  ; ... Math

Corresponding Inkwell:

// PHI node for loop
let phi_counter = self.builder.build_phi(i32_type, "counter_phi").unwrap();
// Adds the entry block to the PHI.
phi_counter.add_incoming(&[(&i32_type.const_int(0, false), entry_block)]);

To exit the loop we increment the counter, add a conditional branch, and add the loop block to the phi node.

  %next_counter = add i32 %counter_phi, 4
  br i1 %loop_cond, label %loop, label %end_loop

Corresponding Inkwell:

let end_block = self.context.append_basic_block(function, "end_loop");

// Increment counter
let next_counter = self
    .builder
    .build_int_add(
        phi_counter.as_basic_value().into_int_value(),
        i32_type.const_int(vector_width.into(), false),
        "next_counter",
    )
    .unwrap();

self.builder
    .build_conditional_branch(loop_condition, loop_block, end_block)
    .unwrap();
self.builder.position_at_end(end_block);
phi_counter.add_incoming(&[(&next_counter, loop_block)]);

self.builder.position_at_end(end_block);

SUM (PHI Accumulator)

Addition is pretty much the same as division except we store the values into a new PHI used to track the results through the whole loop:

loop:                                             ; preds = %loop, %entry
  %phi_sum = phi <4 x double> [ zeroinitializer, %entry ], [ %"vector_sum", %loop ]
  %counter_phi = phi i32 [ 0, %entry ], [ %next_counter, %loop ]
  ...
  %"vector_sum" = fadd <4 x double> %"vectorized_division", %phi_sum

Wrapping It Up

The above, where we continually add two vectors to each other, is called "vertical sum". To finish this, we need to create the body of end block to "horizontally sum" the values in the final vector. This just means we sum up all of the values in the vector. Vertical summing is much faster in general.

end_loop:                                         ; preds = %loop
  %first_element = extractelement <4 x double> %phi_sum, i32 0
  %first_element1 = extractelement <4 x double> %phi_sum, i32 1
  %pair_sum = fadd double %first_element, %first_element1
  %third_element = extractelement <4 x double> %phi_sum, i32 2
  %fourth_element = extractelement <4 x double> %phi_sum, i32 3
  %last_pair_sum = fadd double %third_element, %fourth_element
  %final_sum = fadd double %pair_sum, %last_pair_sum
  ret double %final_sum

And how this looks in Inkwell4:

// Horizontal sum final vector
let const_zero = i32_type.const_int(0, false);
let const_one = i32_type.const_int(1, false);
let const_two = i32_type.const_int(2, false);
let const_three = i32_type.const_int(3, false);
let phi_sum_vector = phi_sum.as_basic_value().into_vector_value();
let paired_sum = self
    .builder
    .build_float_add(
        self.builder
            .build_extract_element(phi_sum_vector, const_zero, "first_element")
            .unwrap()
            .into_float_value(),
        self.builder
            .build_extract_element(phi_sum_vector, const_one, "first_element")
            .unwrap()
            .into_float_value(),
        "pair_sum",
    )
    .unwrap();

let last_paird_sum = self
    .builder
    .build_float_add(
        self.builder
            .build_extract_element(phi_sum_vector, const_two, "third_element")
            .unwrap()
            .into_float_value(),
        self.builder
            .build_extract_element(phi_sum_vector, const_three, "fourth_element")
            .unwrap()
            .into_float_value(),
        "last_pair_sum",
    )
    .unwrap();

// Final reduction.
let final_sum = self
    .builder
    .build_float_add(paired_sum, last_paird_sum, "final_sum")
    .unwrap();

// Return value
self.builder.build_return(Some(&final_sum)).unwrap();

Final Results

Well is it worth it? The answer is...sorta kinda. For the 1MB sample we reduce this down to 74 µs. Was it worth the 4 evenings of learning how to write a vectorized for-loop? Maybe a bit.

f64_taxi_tip_percent_sum/heap-allocated instructions
                        time:   [340.94 µs 343.78 µs 346.92 µs]
Found 3 outliers among 100 measurements (3.00%)
  2 (2.00%) high mild
  1 (1.00%) high severe
f64_taxi_tip_percent_sum/for loop
                        time:   [143.81 µs 145.01 µs 146.37 µs]
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild
f64_taxi_tip_percent_sum/llvm_vectorized
                        time:   [73.871 µs 74.473 µs 75.050 µs]
Found 9 outliers among 100 measurements (9.00%)
  5 (5.00%) low mild
  4 (4.00%) high mild

To summarize why this is fast: we do not have to pull operators from the heap on each operator invocation, and we explicitly use vector instructions for faster operations instead of trying to convince the compiler to use them. We are bringing the instructions to the data vs pushing a small amount of data through each instruction. Beyond that we're focusing on columnar formats here so we also bypass loading data tuple by tuple, but that is a given in this scenario.

20x To The Moon

I decided to try this on a AWS 2xlarge instance. 1MB of data is a bit small. If we 20x the inputs, going from 1MB to 20MB we see a larger difference in the runtimes. I suspect that both of these columns fit well in the 45MB L3 cache so they are nice and hot while iterating.5678

f64_taxi_tip_percent_sum/heap-allocated instructions
                        time:   [9.4463 ms 9.4671 ms 9.4876 ms]
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) high mild
f64_taxi_tip_percent_sum/for loop
                        time:   [4.9242 ms 4.9383 ms 4.9526 ms]
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild
Benchmarking f64_taxi_tip_percent_sum/llvm_vectorized: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.7s, enable flat sampling, or reduce sample count to 60.
f64_taxi_tip_percent_sum/llvm_vectorized
                        time:   [1.1354 ms 1.1458 ms 1.1575 ms]
Found 9 outliers among 100 measurements (9.00%)
  1 (1.00%) low severe
  1 (1.00%) low mild
  6 (6.00%) high mild
  1 (1.00%) high severe

Not the worst results. There is probably a huge amount of room for improvement, but it was a fun experiment!

Refs

1. Umbra

Also check out the Umbra database if you find the first linked paper interesting! Honestly all of the works plubished by Prof. Alfons Kemper and Prof. Dr. Thomas Neumann are enlightening.

2. Vectorized LLVM IR SUM
define double @f64_sum_array(ptr %0, i32 %1, ptr %2, i32 %3) {
entry:
  br label %loop

loop:                                             ; preds = %loop, %entry
  %phi_sum = phi <4 x double> [ zeroinitializer, %entry ], [ %"vector_sum", %loop ]
  %counter_phi = phi i32 [ 0, %entry ], [ %next_counter, %loop ]
  %loop_cond = icmp slt i32 %counter_phi, %1
  %"vector_ptr" = getelementptr inbounds double, ptr %0, i32 %counter_phi
  %"vector_values" = load <4 x double>, ptr %"vector_ptr", align 32
  %"vector_sum" = fadd <4 x double> %"vector_values", %phi_sum
  %next_counter = add i32 %counter_phi, 4
  br i1 %loop_cond, label %loop, label %end_loop

end_loop:                                         ; preds = %loop
  %first_element = extractelement <4 x double> %phi_sum, i32 0
  %first_element1 = extractelement <4 x double> %phi_sum, i32 1
  %pair_sum = fadd double %first_element, %first_element1
  %third_element = extractelement <4 x double> %phi_sum, i32 2
  %fourth_element = extractelement <4 x double> %phi_sum, i32 3
  %last_pair_sum = fadd double %third_element, %fourth_element
  %final_sum = fadd double %pair_sum, %last_pair_sum
  ret double %final_sum
}
3. Vectorized LLVM IR SUM + division.
define double @f64_sum_array(ptr %0, i32 %1, ptr %2, i32 %3) {
entry:
  br label %loop

loop:                                             ; preds = %loop, %entry
  %phi_sum = phi <4 x double> [ zeroinitializer, %entry ], [ %"vector_sum", %loop ]
  %counter_phi = phi i32 [ 0, %entry ], [ %next_counter, %loop ]
  %loop_cond = icmp slt i32 %counter_phi, %1
  %"vector_ptr" = getelementptr inbounds double, ptr %0, i32 %counter_phi
  %"vector_values" = load <4 x double>, ptr %"vector_ptr", align 32
  %"divisor_vector_ptr" = getelementptr inbounds double, ptr %2, i32 %counter_phi
  %"divisor_vector_values" = load <4 x double>, ptr %"divisor_vector_ptr", align 32
  %"vectorized_division" = fdiv <4 x double> %"vector_values", %"divisor_vector_values"
  %"vector_sum" = fadd <4 x double> %"vectorized_division", %phi_sum
  %next_counter = add i32 %counter_phi, 4
  br i1 %loop_cond, label %loop, label %end_loop

end_loop:                                         ; preds = %loop
  %first_element = extractelement <4 x double> %phi_sum, i32 0
  %first_element1 = extractelement <4 x double> %phi_sum, i32 1
  %pair_sum = fadd double %first_element, %first_element1
  %third_element = extractelement <4 x double> %phi_sum, i32 2
  %fourth_element = extractelement <4 x double> %phi_sum, i32 3
  %last_pair_sum = fadd double %third_element, %fourth_element
  %final_sum = fadd double %pair_sum, %last_pair_sum
  ret double %final_sum
}
4. The wrong results.

The astute reader will notice that float operations are not communicative and this produces slightly wrong results. The more you know!

5, 6, 7, 8. Further reading (watching).

This style of data-oriented thinking has come from a few other sources too.

  1. Yellowbrick CMU Talk (and all of the CMU database talks hosted by Andy Pavlo are worth watching as well!)

  2. CppCon 2014: Mike Acton "Data-Oriented Design and C++"

  3. Andrew Kelley Practical Data Oriented Design (DoD)

  4. Self-plug my talk at the Erlang conference for Database Virtualization :)

9. Taxi Data

https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-09.parquet


2561 Words

2024-12-02