In [1]:
import polars as pl
from polars.lazy import *
import numpy as np
from string import ascii_letters
import pandas as pd
import os
from typing import Union

# Laziness

Py-polars has a lazy API that supports a subset of the eager API. Laziness means that operations aren't executed until you ask for them. Let's start with a short example..

Below we'll create a DataFrame in an eager fashion (meaning that the creation of the DataFrame is executed at once).

In [2]:
df = pl.DataFrame({"a": np.arange(0, 10),
 "b": np.random.rand(10),
 "c": list(ascii_letters[:10])})
df

+-----+-------+-----+
| a | b | c |
| --- | --- | --- |
| i64 | f64 | str |
| 0 | 0.111 | "a" |
+-----+-------+-----+
| 1 | 0.034 | "b" |
+-----+-------+-----+
| 2 | 0.56 | "c" |
+-----+-------+-----+
| 3 | 0.142 | "d" |
+-----+-------+-----+
| 4 | 0.584 | "e" |
+-----+-------+-----+
| 5 | 0.537 | "f" |
+-----+-------+-----+
| 6 | 0.643 | "g" |
+-----+-------+-----+
| 7 | 0.349 | "h" |
+-----+-------+-----+
| 8 | 0.716 | "i" |
+-----+-------+-----+
| 9 | 0.451 | "j" |
+-----+-------+-----+

## Lazy DataFrame
To make this a lazy dataframe we call the `.lazy` method. As we can see, not much happens.

In [3]:
ldf = df.lazy()
ldf



We can filter this `DataFrame` on all the rows, but we'll see that again nothing happens. 

*Note the `col` and `lit` (meaning **column** and **literal value**) are part of the lazy **dsl** (domain specific language) and are needed to build a query plan.*

In [4]:
ldf = ldf.filter(col("a") == (lit(2)))
ldf



The query is only executed when we ask for it. This can be done with `.collect` method. 
Below we execute the query and obtain our results.

In [5]:
ldf.collect()

+-----+------+-----+
| a | b | c |
| --- | --- | --- |
| i64 | f64 | str |
| 2 | 0.56 | "c" |
+-----+------+-----+

# Why lazy?
This laziness opens up quite some cool possibitlies from an optimization perspective. 
It allows polars to modify the query right before executing it and make suboptimal queries more performant. Let's show this using various operations, comparing lazy execution with eager execution in both Polars and Pandas.

Let's create 2 DataFrames with quite some columns and rows.

In [6]:
def rand_string(n: int, set_size: int, lower=True) -> str:
 s = "".join(np.random.choice(list(ascii_letters[:set_size]), n))
 if lower:
 return s.lower()
 return s

In [7]:
rows = 100_000
columns = 30
key_size = 5
key_set_size = 4

np.random.seed(1)

dtypes = [np.float32, np.float64, np.int]

df_a = pl.DataFrame({f"column_{i}": np.array(np.random.rand(rows) * 10, dtype=np.random.choice(dtypes)) for i in range(columns)})
s = pl.Series("key", np.array([rand_string(key_size, key_set_size) for _ in range(rows)]))
df_a.insert_at_idx(0, s)

rows = 80_000
columns = 8
df_b = pl.DataFrame({f"column_{i}": np.array(np.random.rand(rows) * 10, dtype=np.random.choice(dtypes)) for i in range(columns)})
s = pl.Series("key", np.array([rand_string(key_size, key_set_size) for _ in range(rows)]))
df_b.insert_at_idx(0, s)


print("Showing a subset of df_a:")
# only show a sub_slice
df_a[:3, :10]

Showing a subset of df_a:


+---------+----------+----------+----------+----------+----------+----------+----------+----------+----------+
| key | column_0 | column_1 | column_2 | column_3 | column_4 | column_5 | column_6 | column_7 | column_8 |
| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |
| str | f64 | f32 | f64 | f32 | i64 | f64 | f64 | f64 | i64 |
| "aacbb" | 4.17 | 6.003 | 4.74 | 0.447 | 3 | 0.43 | 3.206 | 6.397 | 7 |
+---------+----------+----------+----------+----------+----------+----------+----------+----------+----------+
| "abbca" | 7.203 | 1.257 | 3.716 | 5.109 | 0 | 8.854 | 5.967 | 5.141 | 5 |
+---------+----------+----------+----------+----------+----------+----------+----------+----------+----------+
| "accac" | 0.001 | 3.436 | 4.738 | 4.253 | 8 | 6.743 | 5.28 | 4.888 | 0 |
+---------+----------+----------+----------+----------+----------+----------+----------+----------+----------+

In [8]:
df_a_pd = df_a.to_pandas()
df_b_pd = df_b.to_pandas()

# Update 21-10-2020
Arrow 2.0 is out and Polars is also faster in filtering. :)

# Where Polars slightly loses (or wins?)
Let's start with an operation where polars is slower than pandas; filtering. A filter predicate creates a boolean array. Polars/Arrow stores these boolean values not as boolean values of 1 byte,
but as bits, meaning 1 bytes stores 8 booleans. This reduces memory 8-fold, but has some overhead on array creation. As we can see pandas is more than 5x faster, though there is a huge spread.

Pandas has something called a blockmanager which hugely increases filtering performance (I believe due to cache optimallity). However this blockmanager gives performance hits when modifying blocks and block consolidation is triggered. This block consolidation triggers:

* when the blockmanager has > 100 blocks
* groupby operation is executed
* Operations: diff, take, xs, reindex, _is_mixed_type, _is_numeric_mixed_type, values, fillna, replace, resample, concat

Read more about the [blockmanager](https://uwekorn.com/2020/05/24/the-one-pandas-internal.html). 

In [9]:
%%timeit
df_a["column_2"] < 1

10000 loops, best of 5: 99.8 µs per loop


In [10]:
%%timeit
df_a_pd["column_2"] < 1

The slowest run took 35.76 times longer than the fastest. This could mean that an intermediate result is being cached.
10000 loops, best of 5: 119 µs per loop


If we use this mask to select rows from the DataFrame we see that polars gets slower linearly with the number of columns. If we apply this filter on a DataFrame with a single column pandas is **1.2** faster, however the runtime is just 1 ms. So the operations are fast.

In [11]:
%%timeit
df_a[:, :1][df_a["column_2"] < 1]

1000 loops, best of 5: 1.32 ms per loop


In [12]:
%%timeit
df_a_pd.iloc[:, :1][df_a_pd["column_2"] < 1]

100 loops, best of 5: 2.32 ms per loop


~The performance stays approximatly the same with more columns Here we observe that with 30 columns, polars is still **~1.2x** slower.~
* Pandas has great row-wise filtering due to the block-manager (with unexpected performance hits)
* Polars has great row-wise filtering due to embarissingly parallelization.

In [13]:
%%timeit
df_a[df_a["column_2"] < 1]

100 loops, best of 5: 2.33 ms per loop


In [14]:
%%timeit
df_a_pd[df_a_pd["column_2"] < 1]

100 loops, best of 5: 2.41 ms per loop


# Where polars definitly wins
However, polars wins in all the expensive operations. Joins an groupby operations take most of the running time of query. Below we see that a join in polars is more than **3x** faster than the join of pandas and that join operation can take **1000-3000x** the runtime of a DataFrame filter. 

It's better to be fast in the expensive operations.

In [15]:
%%timeit
df_a.join(df_b, left_on="key", right_on="key", how="inner").shape

1 loop, best of 5: 1.06 s per loop


In [16]:
%%timeit
df_a_pd.merge(df_b_pd, left_on="key", right_on="key", how="inner").shape

1 loop, best of 5: 3.95 s per loop


In the groupby operation with an aggregation on all the columns we see that polars is more than **5x** faster. Again polars is embarissingly parallel. Which means that it can be slower than pandas due to parallelization overhead. However, when this is the case, it doesn't matter because you are counting only a few ms extra for parallelization.

In [17]:
%%timeit
df_a.groupby(["key"]).first().shape

100 loops, best of 5: 3.28 ms per loop


In [18]:
%%timeit
df_a_pd.groupby("key").first().shape

100 loops, best of 5: 16.2 ms per loop


# Query optimization
Filtering a DataFrame leads to a new allocation. An often sub-optimal query is doing multiple queries at once.
Let's see if laziness can help optimize that.

In [19]:
def eager(df: Union[pl.DataFrame, pd.DataFrame]):
 df = df[df['column_2'] < 9]
 df = df[df['column_3'] > 1]
 df = df[df['column_6'] > 1]
 df = df[df['column_4'] > 1]
 return df
 
assert eager(df_a_pd).shape == eager(df_a).shape
eager(df_a_pd).shape

(58019, 31)

## Eager polars

In [20]:
%%timeit
eager(df_a)

10 loops, best of 5: 18.7 ms per loop


## Eager pandas

In [21]:
%%timeit
eager(df_a_pd)

10 loops, best of 5: 20.2 ms per loop


In [22]:
def lazy_query(df_a: pl.DataFrame):
 return (df_a.lazy().filter(col("column_2") < lit(9))
 .filter(col("column_3") > lit(1))
 .filter(col("column_6") > lit(1))
 .filter(col("column_4") > lit(1)))

## Lazy polars

In [23]:
%%timeit
lazy_query(df_a).collect()

100 loops, best of 5: 5.88 ms per loop


## Optimization: Combine predicates
Above the query optimizer aggregated all the filters and executed them at once. This reduces a lot of extra allocations at every filter operations. 
With this optimization we don't incur a performance hit by blatantly filtering on different location in a query.

We did increase the eager performance by **~2x** by rewriting the query.

## Optimization (Projection pushdown) Selecting important columns.
Let's look at another optimization. Let's say we are only interested in the columns `"key"` and `"column_1"`. 

A suboptimal eager query could be written like below. This query could be more performant if the projection (selecting columns) was done before the selection (filtering rows). 
Below we see that the lazy query is optimized and selects the needed columns before doing the filter operation. This speeds up the query to **~1.5x** by not filtering columns that are part of the result.

In [24]:
def eager(df: Union[pl.DataFrame, pd.DataFrame]):
 df = df[df['column_2'] < 9]
 df = df[df['column_3'] > 1]
 df = df[df['column_6'] > 1]
 df = df[df['column_4'] > 1]
 return df[["key", "column_1"]]

def lazy_query(df_a: pl.DataFrame):
 return (df_a.lazy().filter(col("column_2") < lit(9))
 .filter(col("column_3") > lit(1))
 .filter(col("column_6") > lit(1))
 .filter(col("column_4") > lit(1))
 .select([col("key"), col("column_1")]))

## Eager polars

In [25]:
%%timeit
eager(df_a)

10 loops, best of 5: 19.1 ms per loop


## Eager pandas

In [26]:
%%timeit
eager(df_a_pd)

10 loops, best of 5: 23.9 ms per loop


## Lazy polars

In [27]:
%%timeit
lazy_query(df_a).collect()


100 loops, best of 5: 3.97 ms per loop


# Optimization: Predicate pushdown
The same trick can be done with predicates. A sub-optimal query would do the filter after an expensive join operation.

In [28]:
def eager(df_a: Union[pl.DataFrame, pd.DataFrame], df_b: Union[pl.DataFrame, pd.DataFrame]):
 df_a = df_a[df_a["column_1"] < 1]
 df_b = df_b[df_b["column_1"] < 1]
 # pandas
 if hasattr(df_a, "values"):
 return df_a.merge(df_b, left_on="key", right_on="key")
 return df_a.join(df_b, left_on="key", right_on="key")


## Eager polars; filter before join

In [29]:
%%timeit
eager(df_a, df_b)

100 loops, best of 5: 11.9 ms per loop


## Eager pandas; filter before join

In [30]:
%%timeit
eager(df_a_pd, df_b_pd)

10 loops, best of 5: 43.9 ms per loop


In [31]:
def eager(df_a: Union[pl.DataFrame, pd.DataFrame], df_b: Union[pl.DataFrame, pd.DataFrame]):
 # pandas
 if hasattr(df_a, "values"):
 df = df_a.merge(df_b, left_on="key", right_on="key")
 df = df[df["column_1_x"] < 1]
 return df
 df = df_a.join(df_b, left_on="key", right_on="key")
 df = df[df["column_1"] < 1]
 return df


## Eager polars; filter after join

In [32]:
%%timeit
eager(df_a, df_b)

1 loop, best of 5: 1.18 s per loop


## Eager pandas; filter after join

In [33]:
%%timeit
eager(df_a_pd, df_b_pd)

1 loop, best of 5: 4.83 s per loop


In [34]:
def lazy_query(df_a: pl.DataFrame, df_b: pl.DataFrame):
 return (df_a.lazy()
 .join(df_b.lazy(), left_on=col("key"), right_on=col("key"))
 .filter(col("column_1") < lit(1))
 )


## Lazy polars; filter after join

In [35]:
%%timeit
lazy_query(df_a, df_b).collect().shape

100 loops, best of 5: 11.6 ms per loop


As we can see, choosing the wrong order of filters has a large effect, slowing down the query more than **66x**. 
In the lazy variant, the optimizer pushed down the predicates such that they are executed before the join.

# Some other queries

In [36]:
def lazy_query(df_a: pl.DataFrame, df_b: pl.DataFrame):
 return (df_a.lazy()
 .join(df_b.lazy(), left_on=col("key"), right_on=col("key"), how="inner")
 .filter(col("column_1") < lit(1))
 .groupby("key")
 .agg([col("column_0").agg_sum()])
 .select([col("key"), col("column_0_sum")])
 )

In [37]:
%%timeit
lazy_query(df_a, df_b).collect()

100 loops, best of 5: 13.7 ms per loop


## Udf (User defined functions <3 Laziness)
The lazy api also has access to all the `eager` operations on `Series` because there are udf's with almost no overhead (no serializing or pickling). Below we'll add a column `"udf"` with a `lambda` and help of the eager api. It still needs some polishing, as we need to make sure that we don't modify the dtypes. I hope you can imagine that this can be very powerful! :)


In [38]:
%%time
def lazy_query(df_a: pl.DataFrame, df_b: pl.DataFrame):
 return (df_a.lazy()
 .join(df_b.lazy(), left_on=col("key"), right_on=col("key"), how="inner")
 .filter(col("column_1") < lit(1))
 .groupby("key")
 .agg([col("column_0").agg_sum(), col("column_2").agg_max().alias("foo")])
 .with_column(col("foo").apply(
 lambda series: pl.Series("", np.ones(series.len(), dtype=np.float32) * series.sum() )
 ).alias('udf'))
 .select([col("key"), col("column_0_sum"), col("udf"), col("foo")])
 )

lazy_query(df_a, df_b).collect()

CPU times: user 76.1 ms, sys: 0 ns, total: 76.1 ms
Wall time: 16.3 ms


In [51]:
# More coming up later.