Rust 1 Billion Row Challenge without Dependencies

- 26 mins

Rust 1 Billion Row Challenge without Dependencies

Table of Contents

  1. Introduction
  2. Base Naive Implementation (90 seconds)
  3. Multithreading Solution (17.96 secs - 80% improvement)
  4. Custom Number Parsing (8.1 seconds - 54.9% improvement)
  5. Custom Key Parsing (6.76 seconds - 16.5% improvement)
  6. Custom Hash Function (5.85 seconds - 13.5% improvement)
  7. Unsafe String Parsing (5.16 seconds - 11.8% improvement)
  8. Conclusion

Introduction

On January 1st, 2024, Gunnar Morling announced the 1 Billion Row Challenge (1BRC). The challenge is to write a Java program to read temperature data from a text file and find the minimum, average, and maximum temperatures for each weather station. The file has 1,000,000,000 rows.

The text file has a simple structure with one measurement value per row:

Graus;12.0
Zaragoza;8.9
Madrid;38.8
Paris;15.2
London;12.6
...

The program should print out the min, mean, and max values per station, ordered alphabetically like so:

{Graus=5.0/18.0/27.4, Madrid=15.7/26.0/34.1, New York=12.1/29.4/35.6, ...}

I was curious and read several implementations in Rust. They were really good and optimized, and I learned a lot from them. However, they did not follow one of the rules of the 1BRC: no external dependencies may be used.

I tried running the official Rust solution with my 1 billion rows test file on my SER5 MAX mini PC. It took 5.7 seconds to execute.

I decided to write my own solution in Rust without using any external crates. My goal was to achieve similar performance to the official solution while keeping the code simple and short.

The code is available on this Github repository.

Base Naive Implementation (90 seconds)

I started by writing a simple, naive and unoptimized first version to use it as a base implementation for further improvements.

use std::{
    collections::BTreeMap,
    fmt::Display,
    fs::File,
    io::{BufRead, BufReader, Result},
    time::Instant,
};

fn main() {
    /*
    The release build is executed in around 90 seconds on SER5 MAX:
       - CPU: AMD Ryzen 7 5800H with Radeon Graphics (16) @ 3.200GHz
       - GPU: AMD ATI Radeon Vega Series / Radeon Vega Mobile Series
       - Memory: 28993MiB
    */
    let start = Instant::now();

    let reader = get_file_reader().unwrap();
    let station_to_metrics = build_map(reader).unwrap();
    print_metrics(station_to_metrics);

    let duration = start.elapsed();
    println!("\n Execution time: {:?}", duration);
}

fn get_file_reader() -> Result<BufReader<File>> {
    let file: File = File::open("./data/weather_stations.csv")?;
    Ok(BufReader::new(file))
}

fn build_map(file_reader: BufReader<File>) -> Result<BTreeMap<String, StationMetrics>> {
    let mut station_to_metrics = BTreeMap::<String, StationMetrics>::new();
    for line in file_reader.lines() {
        let line = line?;
        let (city, temperature) = line.split_once(';').unwrap();
        let temperature: f32 = temperature.parse().expect("Incorrect temperature");
        station_to_metrics
            .entry(city.to_string())
            .or_default()
            .update(temperature);
    }
    Ok(station_to_metrics)
}

// BTreeMap already sorts keys in ascending order.
fn print_metrics(station_to_metrics: BTreeMap<String, StationMetrics>) {
    for (i, (name, state)) in station_to_metrics.into_iter().enumerate() {
        if i == 0 {
            print!("{name}={state}");
        } else {
            print!(", {name}={state}");
        }
    }
}

#[derive(Debug)]
struct StationMetrics {
    sum_temperature: f64,
    num_records: u32,
    min_temperature: f32,
    max_temperature: f32,
}

impl StationMetrics {
    fn update(&mut self, temperature: f32) {
        self.max_temperature = self.max_temperature.max(temperature);
        self.min_temperature = self.min_temperature.min(temperature);
        self.num_records += 1;
        self.sum_temperature += temperature as f64;
    }
}

impl Default for StationMetrics {
    fn default() -> Self {
        StationMetrics {
            sum_temperature: 0.0,
            num_records: 0,
            min_temperature: f32::MAX,
            max_temperature: f32::MIN,
        }
    }
}

impl Display for StationMetrics {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        let avg_temperature = self.sum_temperature / (self.num_records as f64);
        write!(
            f,
            "{:.1}/{avg_temperature:.1}/{:.1}",
            self.min_temperature, self.max_temperature
        )
    }
}

As simple as this:

  1. It opens the CSV file ./data/weather_stations.csv and creates a buffered reader for efficient file reading.
  2. It reads each line of the file, splitting each line into a city name and a temperature value. It uses a BTreeMap to store and update temperature statistics (min, mean, and max) for each city. The BTreeMap automatically keeps the city names sorted.
  3. For each city, it has a StationMetrics struct that tracks the sum, count, min, and max temperatures. The implementation updates these metrics as it processes each line of the file.
  4. Once all data is processed, we print the temperature statistics for each city.

It is executed in 90 seconds on my mini PC. This is too far from the 5.7 seconds of the official implementation. Let’s get started!

Link to commit

I also wrote this script to create a sample test file to try against. In the repository, you can execute it by running cargo run --bin create_data_file.

Multithreading Solution (17.96 secs - 80% improvement)

The first thing that came to mind to improve performance was to introduce multithreading, as my mini PC has a CPU with 8 cores and 16 threads. We can follow this strategy: create as many threads as the number of CPU threads (N). Then, each thread will process 1/N of the file in parallel. Finally, we will merge the results from all threads to calculate the final result.

Here are the changes:

The main function determines the number of available CPU cores to decide how many threads to spawn.

fn main() {
    /*
    The release build is executed in around 17.96 seconds on SER5 PRO MAX:
       - CPU: AMD Ryzen 7 5800H with Radeon Graphics (16) @ 3.200GHz
       - GPU: AMD ATI Radeon Vega Series / Radeon Vega Mobile Series
       - Memory: 28993MiB
    */
    let start = Instant::now();

    let n_threads: usize = std::thread::available_parallelism().unwrap().into();

    ...

Then it divides the file into intervals based on `n_threads. Each interval represents a chunk of the file to be processed by a thread in parallel.

Note that the function that calculates the intervals ensures that no lines are split beetween chunks by adjusting the end positions to the end of the line.

/// Splits the file into intervals based on the number of CPUs.
/// Each interval is determined by dividing the file size by the number of CPUs
/// and adjusting the intervals to ensure lines are not split between chunks.
///
/// Example:
///
/// Suppose the file size is 1000 bytes and `cpus` is 4.
/// The file will be divided into 4 chunks, and the intervals might be as follows:
///
/// Interval { start: 0, end: 249 }
/// Interval { start: 250, end: 499 }
/// Interval { start: 500, end: 749 }
/// Interval { start: 750, end: 999 }
/// ```
fn get_file_intervals_for_cpus(
    cpus: usize,
    file_size: u64,
    reader: &mut BufReader<File>,
) -> Vec<Interval> {
    let chunk_size = file_size / (cpus as u64);
    let mut intervals = Vec::new();
    let mut start = 0;
    let mut buf = String::new();

    for _ in 0..cpus {
        let mut end: u64 = (start + chunk_size).min(file_size);
        _ = reader.seek(SeekFrom::Start(end));
        let bytes_until_end_of_line = reader.read_line(&mut buf).unwrap();
        end = end + (bytes_until_end_of_line as u64) - 1; // -1 because read_line() also reads the /n

        intervals.push(Interval { start, end });

        start = end + 1;
        buf.clear();
    }
    intervals
}

For each interval, a new thread is spawned to process the corresponding file chunk. The process_chunkfunction reads the assigned file chunk and builds its own StationsMap for that chunk.


fn process_chunk(file_path: &Path, interval: Interval) -> StationsMap {
    let mut reader = get_file_reader(file_path).unwrap();
    // Starts from the interval start.
    _ = reader.seek(SeekFrom::Start(interval.start));
    // The readers only reads the number of bytes for that interval.
    let chunk_reader = reader.take(interval.end - interval.start);
    build_map(chunk_reader).unwrap()
}

After all threads complete, their results are merged into a single map using the merge_maps function.

fn merge_maps(a: StationsMap, b: &StationsMap) -> StationsMap {
    let mut merged_map = a;
    for (k, v) in b {
        merged_map.entry(k.into()).or_default().merge(v);
    }
    merged_map
}

Here is the final main function to get a better overview of all parts.


fn main() {
    let start = Instant::now();

    // Number of threads available.
    let n_threads: usize = std::thread::available_parallelism().unwrap().into();

    let file_path = Path::new("./data/weather_stations.csv");
    let file_size = fs::metadata(&file_path).unwrap().size();
    let mut reader = get_file_reader(file_path).unwrap();

    // Divide the file into n_threads intervals.
    let intervals = get_file_intervals_for_cpus(n_threads, file_size, &mut reader);

    // Vector that contains all partial results of each thread.
    let results = Arc::new(Mutex::new(Vec::new()));
    let mut handles = Vec::new();

    for interval in intervals {
        let results = Arc::clone(&results);

        // Each thread process a chunk in parallel.
        let handle = thread::spawn(move || {
            let station_to_metrics = process_chunk(file_path, interval);
            results.lock().unwrap().push(station_to_metrics);
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().expect("Thread panicked");
    }

    // Combines all partial results into the final result.
    let result = results
        .lock()
        .unwrap()
        .iter()
        .fold(StationsMap::default(), |a, b| merge_maps(a, &b));

    print_metrics(&result);
    println!("\n Execution time: {:?}", start.elapsed());
}

This solution improves the execution time from 90 seconds to 17.96 seconds. Great achivement! But we need to do better to be closer to the 5.7 seconds of the official solution. Let’s continue optimizing!

Link to commit

Custom number parsing (8.1 seconds - 54.9% improvement)

Let’s use cargo flamegraph to visualize the stack of the current solution to know what we can start optimizing. Since I am using Fedora, it uses perf under the hood:

cargo flamegraph -b one-billion-row

We get the following flame graph:

Flamegraph of the first multitheading solution

If we zoom in on the right side of the image, we will see that almost 10% of the samples are for parsing the temperature into a f32 number:

Zoomed in flamegraph of the first multitheading solution

The image corresponds to this part of the code:

let temperature: f32 = temperature.parse().expect("Incorrect temperature");

We know that our test file contains all temperatures in one of the following two formats:

  1. ab.c (e.g., 12.5)
  2. b.c (e.g., 5.4)

Also, in case of negative numbers it has a - right before.

Knowing this, we update our code to read each line of the file chunk as bytes instead of strings, and write a function that manually parses the bytes corresponding to the temperature to a fixed-precision i32 signed integer. This should be faster than parsing the file bytes to a string and then to a f32.

// Assuming the file always have 1-2 integer parts and always 1 decimal digit
fn parse_temperature(mut s: &[u8]) -> V {
    let neg = if s[0] == b'-' {
        s = &s[1..];
        true
    } else {
        false
    };

    let (a, b, c) = match s {
        [a, b, b'.', c] => (a - b'0', b - b'0', c - b'0'),
        [b, b'.', c] => (0, b - b'0', c - b'0'),
        _ => panic!("Unknown pattern {:?}", std::str::from_utf8(s).unwrap()),
    };

    let v = (a as V) * 100 + (b as V) * 10 + (c as V);

    if neg {
        -v
    } else {
        v
    }
}

This is how the function works:

  1. It first checks if the first byte (character) in the input slice s is a minus sign (-). If so, it removes it.
  2. It uses pattern matching to handle the two formats of the temperature:
    • [a, b, b'.', c]: This pattern matches when the input has two digits before the decimal point and one digit after it. For example, 23.4.
    • [b, b'.', c]: This pattern matches when the input has one digit before the decimal point and one digit after it. For example, 3.4.
  3. For both patterns it extracts the numeric values of the digits by subtracting the ASCII value of 0 from each byte. This converts the ASCII byte representation of a digit to its actual integer value.
  4. It calculates the temperature by combining the digits:
    • a is multiplied by 100.
    • b is multiplied by 10.
    • c is used as is.
  5. The sum of these products gives the temperature. But notice that we need to divide it by 10 at the printing step.
  6. If the number is negative it returns -v.

Note that if we want to support other formats, we need to update the function by adding a new branch to the match statement.

After this change, we have improved the execution time from 17.96 seconds to 8.1 seconds. We are getting closer!

Link to commit

Custom key parsing (6.76 seconds - 16.5% improvement)

Let’s now generate another frame graph of the current solution to see what can be our next improvement:

Flamegraph of the second multitheading solution

If we zoom in on the right side of the image, we will see that more than 10% of the samples are for parsing the city ftom a bytes slice to a string:

Zoomed in flamegraph of the first multitheading solution

Let’s do the same we did for parsing the temperature. We are going to write a custom parser to make the program faster.

Our current StationsMap maps from city names (String) to StationMetrics. What if we change it to BTreeMap<u64, StationMetrics>? We can write a fast function that parses from a bytes slice to a u64, and u64 (8 bytes) should be enough to identify a city (e.g. the first 8 characters of its name).

Having a u64 instead of a string as key also comes with the advantage of having the hash map keys inlined.

How hash maps work

When you use String as keys in a HashMap, each key is a heap-allocated, dynamically sized string. This can have implications for performance, especially in terms of hashing and memory usage:

Hash map 1

When you use u64 as keys in a HashMap, each key is a fixed-size, stack-allocated integer. This is typically more efficient in terms of both hashing and memory usage.

Hash map 2

The difference is that since each u64 key is stack-allocated, it uses a fixed amount of memory and is stored directly on the stack, which is generally faster to allocate and deallocate. That is why by using u64 keys, we can achieve better performance for lookups, insertions, and deletions in the hash map.

See how a value is retrieved in both cases.

HashMap with String keys:

Hash map with string keys

HashMap with u64 keys (inlined):

Hash map with u64 keys

As we can see in the graphs, the differences are that:

  1. u64 keys are allocated in the stack which is faster and don´t need the additional step of heap allocating the key.
  2. Hashing a fixed-size integer is faster.

Code changes

Let’s then change StationsMap to BTreeMap<u64, StationMetrics> and write a function to parse the city as u64. The inconvenience is that we will need to add a string field (city) to StationMetrics to store the actual name of the city. Notice this string will be only calculated once for each city, not for the one billion rows.

fn to_key(data: &[u8]) -> u64 {
    let mut hash = 0u64;
    let len = data.len();
    unsafe {
        if len >= 8 {
            hash = *(data.as_ptr() as *const u64);
        } else {
            for i in 0..len {
                hash |= (*data.get_unchecked(i) as u64) << (i * 8);
            }
        }
    }

    hash ^= len as u64;
    hash
}

As we already said, the to_key function converts a slice of bytes into a u64 integer that is used to identify cities in our StationMaps. This is how it works:

  1. It starts by creating a variable called hash and sets it to 0.
  2. It then gets the length of the input data (number of bytes).
  3. If the data length is 8 or more bytes:
    • It directly reads the first 8 bytes and interprets them as a u64 integer. This is a fast way to create a key.
  4. If the data length is less than 8 bytes:
    • It processes each byte individually in a loop.
    • For each byte, it shifts the byte’s value by its position (multiplied by 8) and combines it with the hash using the bitwise OR operator.
  5. Finally, it adjusts the final hash by XOR-ing with the length of the data. This is to ensure that cities that start with the same 8 bytes have a different hash (assuming they have different length).

Running this solution now improves the execution time from 8.1 seconds to 6.76 seconds. We still have some work to do!

Link to commit

Custom hash function (5.85 seconds - 13.5% improvement)

In the previous section, we explained how hash maps work. In the diagrams, we saw that retrieving a value for a given key from the hash map requires first hashing the key, then looking up that hash in the hash table, and finally retrieving the data for that hash.

A question that naturally arises now is: why do we need to hash our keys if we are already using a u64 key that identifies the city? Why not using that u64 key directly as hash so we avoid that extra step?

Let’s use a custom hasher that just returns the u64 without applying any hash function.

One inconvenience is that BTreeMap does not support custom hashers, so we will have to use a HashMap instead. This means we will need to sort the values before printing them.

#[derive(Default)]
struct NoOpHasher {
    hash: u64,
}

impl Hasher for NoOpHasher {
    fn write(&mut self, _bytes: &[u8]) {
        panic!("NoOpHasher only supports u64 values");
    }

    fn write_u64(&mut self, i: u64) {
        self.hash = i;
    }

    fn finish(&self) -> u64 {
        self.hash
    }
}

struct NoOpBuildHasher;

impl BuildHasher for NoOpBuildHasher {
    type Hasher = NoOpHasher;

    fn build_hasher(&self) -> NoOpHasher {
        NoOpHasher::default()
    }
}

type StationsMap = HashMap<u64, StationMetrics, NoOpBuildHasher>;

Not much to comment about this piece of code, it just does what we already mentioned: it uses a u64 value without applying any hash function.

After applying these changes and executing the code, we see an improvement from 6.76 seconds to 5.85 seconds. This is almost the same as the official Rust solution (5.7 seconds)!

Link to commit

Unsafe string parsing (5.16 seconds - 11.8% improvement)

Our current solution is already optimized and has a performance similar to the official Rust solution. But can we do a final optimization? Let’s see what the flamegraph has to say:

Flamegraph of the custom hash solution

If we zoom in, we see that 11% of the samples are calls to core::str::converts::from_utf8:

Zoomed in flamegraph of the custom hash solution

How is this possible if we are parsing the city names as u64? This is because we are still parsing it as strings to store them in the StationMetrics struct to printing at the end. Even though we only parse them once per city and per thread, this represents 11% of the traces.

How can we improve this? We know that we are calling std::str::from_utf8(city).unwrap().to_string(), which is safe and checks whether the byte slice is valid UTF-8. Since we know that our file contains valid UTF-8 strings, we can replace it with:

city: unsafe { std::str::from_utf8_unchecked(city).to_string() },

This way, we skip the UTF-8 validation. Note that it should be only used when you are certain that the byte slice is a valid UTF-8.

Now the program takes 5.16 seconds. This time, it is faster than the official Rust solution!

Link to commit

Conclusion

In this blog post, we explored various optimizations for the 1 Billion Row Challenge in Rust, aiming to improve performance without external dependencies. We started from a basic solution that took 90 seconds and implemented several enhancements:

While our final solution is faster than the official Rust implementation, other approaches using libraries like memmap or hashmap might be even more efficient for real-world scenarios.

Keep in mind that these results are specific to my machine and test file, so performance may vary for others. I invite you to find more optimizations and share them in the comments!

Thanks for reading, and happy optimizing!

Ricardo Pallas

Ricardo Pallas

λ Software Engineer

rss facebook twitter github youtube mail spotify lastfm instagram linkedin google google-plus pinterest medium vimeo stackoverflow reddit quora quora