{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Linking in Spark\n", "\n" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "22/09/19 14:39:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "Setting default log level to \"WARN\".\n", "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n" ] } ], "source": [ "from splink.spark.jar_location import similarity_jar_location\n", "\n", "from pyspark import SparkContext, SparkConf\n", "from pyspark.sql import SparkSession\n", "from pyspark.sql import types\n", "\n", "conf = SparkConf()\n", "# This parallelism setting is only suitable for a small toy example\n", "conf.set(\"spark.driver.memory\", \"12g\")\n", "conf.set(\"spark.default.parallelism\", \"16\")\n", "\n", "\n", "# Add custom similarity functions, which are bundled with Splink\n", "# documented here: https://github.com/moj-analytical-services/splink_scalaudfs\n", "path = similarity_jar_location()\n", "conf.set(\"spark.jars\", path)\n", "\n", "sc = SparkContext.getOrCreate(conf=conf)\n", "\n", "spark = SparkSession(sc)\n", "spark.sparkContext.setCheckpointDir(\"./tmp_checkpoints\")\n", "\n", "# Register the jaro winkler custom udf\n", "spark.udf.registerJavaFunction(\n", " \"jaro_winkler\", \"uk.gov.moj.dash.linkage.JaroWinklerSimilarity\", types.DoubleType()\n", ")\n" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "import pandas as pd \n", "df = spark.read.csv(\"./data/fake_1000.csv\", header=True)" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "import splink.spark.spark_comparison_library as cl\n", "\n", "settings = {\n", " \"link_type\": \"dedupe_only\",\n", " \"comparisons\": [\n", " cl.jaro_winkler_at_thresholds(\"first_name\", 0.8),\n", " cl.jaro_winkler_at_thresholds(\"surname\", 0.8),\n", " cl.levenshtein_at_thresholds(\"dob\"),\n", " cl.exact_match(\"city\", term_frequency_adjustments=True),\n", " cl.levenshtein_at_thresholds(\"email\"),\n", " ],\n", " \"blocking_rules_to_generate_predictions\": [\n", " \"l.first_name = r.first_name\",\n", " \"l.surname = r.surname\",\n", " ],\n", " \"retain_matching_columns\": True,\n", " \"retain_intermediate_calculation_columns\": True,\n", " \"em_convergence\": 0.01\n", "}" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/Users/robinlinacre/Documents/data_linking/splink_demos/venv/lib/python3.8/site-packages/pyspark/sql/dataframe.py:148: UserWarning: DataFrame.sql_ctx is an internal property, and will be removed in future releases. Use DataFrame.sparkSession instead.\n", " warnings.warn(\n", "Probability two random records match is estimated to be 0.00389.\n", "This means that amongst all possible pairwise record comparisons, one in 257.25 are expected to match. With 499,500 total possible comparisons, we expect a total of around 1,941.67 matching pairs\n" ] } ], "source": [ "from splink.spark.spark_linker import SparkLinker\n", "linker = SparkLinker(df, settings)\n", "deterministic_rules = [\n", " \"l.first_name = r.first_name and levenshtein(r.dob, l.dob) <= 1\",\n", " \"l.surname = r.surname and levenshtein(r.dob, l.dob) <= 1\",\n", " \"l.first_name = r.first_name and levenshtein(r.surname, l.surname) <= 2\",\n", " \"l.email = r.email\"\n", "]\n", "\n", "linker.estimate_probability_two_random_records_match(deterministic_rules, recall=0.6)\n" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "----- Estimating u probabilities using random sampling ----- \n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "22/09/19 14:39:14 WARN DataSource: All paths were ignored:\n", " file:/Users/robinlinacre/Documents/data_linking/splink_demos/tmp_checkpoints/06909547-db7a-49ae-a2d5-e913b503cb7c/__splink__df_concat_with_tf\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ " \n", "Estimated u probabilities using random sampling\n", "\n", "Your model is not yet fully trained. Missing estimates for:\n", " - first_name (no m values are trained).\n", " - surname (no m values are trained).\n", " - dob (no m values are trained).\n", " - city (no m values are trained).\n", " - email (no m values are trained).\n" ] } ], "source": [ "linker.estimate_u_using_random_sampling(target_rows=5e5)" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "\n", "----- Starting EM training session -----\n", "\n", "Estimating the m probabilities of the model by blocking on:\n", "l.first_name = r.first_name and l.surname = r.surname\n", "\n", "Parameter estimates will be made for the following comparison(s):\n", " - dob\n", " - city\n", " - email\n", "\n", "Parameter estimates cannot be made for the following comparison(s) since they are used in the blocking rules: \n", " - first_name\n", " - surname\n", " \n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "22/09/19 14:39:20 WARN DataSource: All paths were ignored:\n", " file:/Users/robinlinacre/Documents/data_linking/splink_demos/tmp_checkpoints/06909547-db7a-49ae-a2d5-e913b503cb7c/__splink__df_comparison_vectors_77ebe74\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "Iteration 1: Largest change in params was -0.53 in the m_probability of dob, level `Exact match`\n", "Iteration 2: Largest change in params was 0.0335 in probability_two_random_records_match\n", "Iteration 3: Largest change in params was 0.0129 in probability_two_random_records_match\n", "Iteration 4: Largest change in params was 0.00639 in probability_two_random_records_match\n", "\n", "EM converged after 4 iterations\n", "\n", "Your model is not yet fully trained. Missing estimates for:\n", " - first_name (no m values are trained).\n", " - surname (no m values are trained).\n", "\n", "----- Starting EM training session -----\n", "\n", "Estimating the m probabilities of the model by blocking on:\n", "l.dob = r.dob\n", "\n", "Parameter estimates will be made for the following comparison(s):\n", " - first_name\n", " - surname\n", " - city\n", " - email\n", "\n", "Parameter estimates cannot be made for the following comparison(s) since they are used in the blocking rules: \n", " - dob\n", " \n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "22/09/19 14:39:26 WARN DataSource: All paths were ignored:\n", " file:/Users/robinlinacre/Documents/data_linking/splink_demos/tmp_checkpoints/06909547-db7a-49ae-a2d5-e913b503cb7c/__splink__df_comparison_vectors_2bd95d3\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "Iteration 1: Largest change in params was -0.413 in the m_probability of surname, level `Exact match`\n", "Iteration 2: Largest change in params was 0.108 in probability_two_random_records_match\n", "Iteration 3: Largest change in params was 0.0348 in probability_two_random_records_match\n", "Iteration 4: Largest change in params was 0.0133 in probability_two_random_records_match\n", "Iteration 5: Largest change in params was 0.00593 in probability_two_random_records_match\n", "\n", "EM converged after 5 iterations\n", "\n", "Your model is fully trained. All comparisons have at least one estimate for their m and u values\n" ] } ], "source": [ "training_blocking_rule = \"l.first_name = r.first_name and l.surname = r.surname\"\n", "training_session_fname_sname = linker.estimate_parameters_using_expectation_maximisation(training_blocking_rule)\n", "\n", "training_blocking_rule = \"l.dob = r.dob\"\n", "training_session_dob = linker.estimate_parameters_using_expectation_maximisation(training_blocking_rule)" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "22/09/19 14:39:32 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.\n", "22/09/19 14:39:32 WARN DataSource: All paths were ignored:\n", " file:/Users/robinlinacre/Documents/data_linking/splink_demos/tmp_checkpoints/06909547-db7a-49ae-a2d5-e913b503cb7c/__splink__df_predict_99e9cb4\n" ] } ], "source": [ "results = linker.predict(threshold_match_probability=0.9)" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", " | match_weight | \n", "match_probability | \n", "unique_id_l | \n", "unique_id_r | \n", "first_name_l | \n", "first_name_r | \n", "gamma_first_name | \n", "bf_first_name | \n", "surname_l | \n", "surname_r | \n", "... | \n", "gamma_city | \n", "tf_city_l | \n", "tf_city_r | \n", "bf_city | \n", "bf_tf_adj_city | \n", "email_l | \n", "email_r | \n", "gamma_email | \n", "bf_email | \n", "match_key | \n", "
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | \n", "3.371739 | \n", "0.911904 | \n", "220 | \n", "223 | \n", "Logan | \n", "Logan | \n", "2 | \n", "86.748396 | \n", "serguFon | \n", "Ferguson | \n", "... | \n", "1 | \n", "0.212792 | \n", "0.212792 | \n", "10.316002 | \n", "0.259162 | \n", "l.feruson46@sahh.com | \n", "None | \n", "-1 | \n", "1.000000 | \n", "0 | \n", "
1 | \n", "14.743406 | \n", "0.999964 | \n", "879 | \n", "880 | \n", "Leo | \n", "Leo | \n", "2 | \n", "86.748396 | \n", "Webster | \n", "Webster | \n", "... | \n", "1 | \n", "0.008610 | \n", "0.008610 | \n", "10.316002 | \n", "6.404996 | \n", "leo.webster54@moore.biez | \n", "None | \n", "-1 | \n", "1.000000 | \n", "0 | \n", "
2 | \n", "13.140266 | \n", "0.999889 | \n", "446 | \n", "450 | \n", "Aisha | \n", "Aisha | \n", "2 | \n", "86.748396 | \n", "Bryant | \n", "None | \n", "... | \n", "0 | \n", "0.011070 | \n", "0.001230 | \n", "0.456259 | \n", "1.000000 | \n", "aishab64@obrien-flores.com | \n", "aishab64@obrien-flores.com | \n", "3 | \n", "257.458944 | \n", "0 | \n", "
3 | \n", "8.829126 | \n", "0.997806 | \n", "446 | \n", "448 | \n", "Aisha | \n", "Aisha | \n", "2 | \n", "86.748396 | \n", "Bryant | \n", "BryBant | \n", "... | \n", "0 | \n", "0.011070 | \n", "0.001230 | \n", "0.456259 | \n", "1.000000 | \n", "aishab64@obrien-flores.com | \n", "aishab64@obrien-flores.com | \n", "3 | \n", "257.458944 | \n", "0 | \n", "
4 | \n", "6.584844 | \n", "0.989690 | \n", "790 | \n", "791 | \n", "Jackson | \n", "Jackson | \n", "2 | \n", "86.748396 | \n", "Fisreh | \n", "Fishier | \n", "... | \n", "0 | \n", "0.009840 | \n", "0.001230 | \n", "0.456259 | \n", "1.000000 | \n", "j.fisher4@sullivan.com | \n", "None | \n", "-1 | \n", "1.000000 | \n", "0 | \n", "
5 rows × 28 columns
\n", "