# Memory management in large jobs

Users may experience out of memory errors when running large linkage and deduplication jobs with `Splink`'s default settings.

This notebook demonstrates how `Splink` can be configured to reduce the amount of memory needed.  

However, before demoing these configuration settings it's important to understand why you might be running out of memory.

## Why might you run out of memory?

---

#### 1. Your blocking rules generate too many comparisons, or you are computing a large cartesian product


⚠️ **Increasing the size of your Spark cluster or changing `splink` configuration options other than blocking rules is unlikely to solve this problem.**


Imagine you are running a deduplication job on a large input dataset called `df`, and one of your blocking rules is:

`"l.first_name = r.first_name"`

Let's assume there are 10,000 people in your dataset with the first name 'John'.

`splink` will perform a join that looks a bit like this:

```sql
select *
from df as l
inner join df as r
on l.first_name = r.first_name
```

This will result in 10,000 * 10,000 = 100 million rows being generated for Johns _on a single node_ of your spark cluster.  This happens because when Spark performs a join, it hash partitions on the join key.  This means all records for each first_name go to the same node.



#### Solution: Tighten your blocking rules so they generate fewer comparisons.  

---


#### 2. You're getting memory errors at the blocking stage despite having tight blocking rules

⚠️ **Increasing the size of your Spark cluster will sometimes mitigate these issues but there is a more efficient solution**

This is often because Spark's default parallelism of 200 shuffle partitions is not really appropriate to the task of generating comparisons from blocking rules.

When generating record comparisons from your blocking rules, your input dataset will be split into 200 parts (jobs).  

This is generally insufficient for large jobs because, whilst 200 parts may be sufficient for your input dataset, the dataset of comparisons generated by your blocking rules is typically 10 times as large more.

Furthermore, blocking rules can often result in skew - e.g. the number of comparisons generated by 'John's is larger than those generated by 'Robin's - meaning that if a few 'John's happen to end up on the same node, it can run out of memory.


#### Solution: Increase spark's parallelism

Try increasing the spark's parallelism as follows (start with 1000, but try different values)
```
spark.conf.set("spark.sql.shuffle.partitions", "1000")
spark.conf.set("spark.default.parallelism", "1000")
```

---

#### 3. Failing to 'break the lineage'.

⚠️  **Increasing the size of your Spark cluster will sometimes mitigate these issues but there is a more efficient solution**

By default, `splink` attempts to compute its results as a single spark job.  However, there is a [known problem](https://www.pdl.cmu.edu/PDL-FTP/Storage/CMU-PDL-18-101.pdf) in Spark of long lineage, and iterative algorithms like the one used by `splink` are often cited as the culprit.  

Breaking the lineage means splitting the job up into smaller parts which can be computed independently.  This can result in somewhat longer execution times (perhaps 30% longer), but often allows the same job to successfully complete on a smaller cluster.

`splink` has some configuration options which allow you to break the lineage.  This must be configured by the user because the solution will depend on the specifics of your spark cluster and/or cloud provider.

In the remainder of this notebook we give a simple example of breaking the lineage, using the same job as in the [data deduplication quick start](quickstart_demo_deduplication.ipynb)

---

#### 4. You need a bigger cluster

If you've tried all of the above and are still getting out of memory errors, you may need a bigger cluster, or higher memory nodes.

As a guide, we've successfully processed 15 million input records and 170 million comparisons using AWS Glue with a cluster of size 5.

## Customising a job to break the lineage

## Step 1:  Imports and setup

The following is just boilerplate code that sets up the Spark session and sets some other non-essential configuration options

In [1]:
import pandas as pd 
pd.options.display.max_columns = 500
pd.options.display.max_rows = 100

In [2]:
import logging 
logging.basicConfig() 
logging.getLogger("splink").setLevel(logging.INFO)

In [3]:
from utility_functions.demo_utils import get_spark
spark = get_spark() # See utility_functions/demo_utils.py for how to set up Spark

## Step 2: Write two functions to break the lineage

In the underlying `splink` algorithm, there are two sensible places to break the lineage:
- After comparisons have been generated and the comparison vector has been computed.  (These need to be computed once but are used many times during the EM algorithm)
- After the EM algorithm has completed iterating, but before term frequencies adjustments are computed.

`splink` allows the user to provide two custom functions that are executed in these two places to break the lineage.  

They both take as an argument a dataframe, and require the user to break the lineage.  Usually this is done by:
- writing the dataframe to disk, and loading it back from disk (as in this example)
- writing the dataframe to cloud storage, and loading it back from cloud storage.
- `persist`ing, `checkpoint`ing or `cache`ing the dataframe (although in our experience, this has been unreliable)


In [4]:
def blocked_comparisons_to_disk(df, spark):
    df.write.mode("overwrite").parquet("gammas_temp/")
    df_new = spark.read.parquet("gammas_temp/")
    return df_new

def scored_comparisons_to_disk(df, spark):
    df.write.mode("overwrite").parquet("df_e_temp/")
    df_new = spark.read.parquet("df_e_temp/")
    return df_new
    

## Step 3: Set up a `splink` job, providing the custom functions as arguments


In [6]:
from splink import Splink

df = spark.read.parquet("data/fake_1000.parquet")

settings = {
    "link_type": "dedupe_only",
    "blocking_rules": [
        "l.first_name = r.first_name",
        "l.surname = r.surname",
        "l.dob = r.dob"
    ],
    "comparison_columns": [
        {
            "col_name": "first_name",
            "num_levels": 3,
            "term_frequency_adjustments": True
        },
        {
            "col_name": "surname",
            "num_levels": 3,
            "term_frequency_adjustments": True
        },
        {
            "col_name": "dob"
        },
        {
            "col_name": "city"
        },
        {
            "col_name": "email"
        }
    ],
    "additional_columns_to_retain": ["group"],
    "em_convergence": 0.01
}

from splink import Splink

linker = Splink(settings, 
                df,
                spark, 
                break_lineage_blocked_comparisons = blocked_comparisons_to_disk,
                break_lineage_scored_comparisons = scored_comparisons_to_disk
                
               )
df_e = linker.get_scored_comparisons()

INFO:splink.iterate:Iteration 0 complete
INFO:splink.model:The maximum change in parameters was 0.49493410587310793 for key surname, level 2
INFO:splink.iterate:Iteration 1 complete
INFO:splink.model:The maximum change in parameters was 0.12059885263442993 for key surname, level 2
INFO:splink.iterate:Iteration 2 complete
INFO:splink.model:The maximum change in parameters was 0.039048194885253906 for key surname, level 0
INFO:splink.iterate:Iteration 3 complete
INFO:splink.model:The maximum change in parameters was 0.017310582101345062 for key dob, level 1
INFO:splink.iterate:Iteration 4 complete
INFO:splink.model:The maximum change in parameters was 0.014871925115585327 for key email, level 0
INFO:splink.iterate:Iteration 5 complete
INFO:splink.model:The maximum change in parameters was 0.011928170919418335 for key email, level 0
INFO:splink.iterate:Iteration 6 complete
INFO:splink.model:The maximum change in parameters was 0.009333670139312744 for key email, level 1
INFO:splink.iterat

## Inspect results 



In [7]:
# Inspect main dataframe that contains the match scores
cols_to_inspect = ["match_probability","unique_id_l","unique_id_r","group_l", "group_r", "first_name_l","first_name_r","surname_l","surname_r","dob_l","dob_r","city_l","city_r","email_l","email_r",]

df_e.toPandas()[cols_to_inspect].sort_values(["unique_id_l", "unique_id_r"]).head(5)

Unnamed: 0,match_probability,unique_id_l,unique_id_r,group_l,group_r,first_name_l,first_name_r,surname_l,surname_r,dob_l,dob_r,city_l,city_r,email_l,email_r
2,0.955579,0,1,0,0,Julia,Julia,,Taylor,2015-10-29,2015-07-31,London,London,hannah88@powers.com,hannah88@powers.com
1,0.955579,0,2,0,0,Julia,Julia,,Taylor,2015-10-29,2016-01-27,London,London,hannah88@powers.com,hannah88@powers.com
0,0.77138,0,3,0,0,Julia,Julia,,Taylor,2015-10-29,2015-10-29,London,,hannah88@powers.com,hannah88opowersc@m
4,0.939962,1,2,0,0,Julia,Julia,Taylor,Taylor,2015-07-31,2016-01-27,London,London,hannah88@powers.com,hannah88@powers.com
3,0.02255,1,3,0,0,Julia,Julia,Taylor,Taylor,2015-07-31,2015-10-29,London,,hannah88@powers.com,hannah88opowersc@m
