{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Linking in Spark\n", "\n" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "22/09/19 14:39:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "Setting default log level to \"WARN\".\n", "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n" ] } ], "source": [ "from splink.spark.jar_location import similarity_jar_location\n", "\n", "from pyspark import SparkContext, SparkConf\n", "from pyspark.sql import SparkSession\n", "from pyspark.sql import types\n", "\n", "conf = SparkConf()\n", "# This parallelism setting is only suitable for a small toy example\n", "conf.set(\"spark.driver.memory\", \"12g\")\n", "conf.set(\"spark.default.parallelism\", \"16\")\n", "\n", "\n", "# Add custom similarity functions, which are bundled with Splink\n", "# documented here: https://github.com/moj-analytical-services/splink_scalaudfs\n", "path = similarity_jar_location()\n", "conf.set(\"spark.jars\", path)\n", "\n", "sc = SparkContext.getOrCreate(conf=conf)\n", "\n", "spark = SparkSession(sc)\n", "spark.sparkContext.setCheckpointDir(\"./tmp_checkpoints\")\n", "\n", "# Register the jaro winkler custom udf\n", "spark.udf.registerJavaFunction(\n", " \"jaro_winkler\", \"uk.gov.moj.dash.linkage.JaroWinklerSimilarity\", types.DoubleType()\n", ")\n" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "import pandas as pd \n", "df = spark.read.csv(\"./data/fake_1000.csv\", header=True)" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "import splink.spark.spark_comparison_library as cl\n", "\n", "settings = {\n", " \"link_type\": \"dedupe_only\",\n", " \"comparisons\": [\n", " cl.jaro_winkler_at_thresholds(\"first_name\", 0.8),\n", " cl.jaro_winkler_at_thresholds(\"surname\", 0.8),\n", " cl.levenshtein_at_thresholds(\"dob\"),\n", " cl.exact_match(\"city\", term_frequency_adjustments=True),\n", " cl.levenshtein_at_thresholds(\"email\"),\n", " ],\n", " \"blocking_rules_to_generate_predictions\": [\n", " \"l.first_name = r.first_name\",\n", " \"l.surname = r.surname\",\n", " ],\n", " \"retain_matching_columns\": True,\n", " \"retain_intermediate_calculation_columns\": True,\n", " \"em_convergence\": 0.01\n", "}" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/Users/robinlinacre/Documents/data_linking/splink_demos/venv/lib/python3.8/site-packages/pyspark/sql/dataframe.py:148: UserWarning: DataFrame.sql_ctx is an internal property, and will be removed in future releases. Use DataFrame.sparkSession instead.\n", " warnings.warn(\n", "Probability two random records match is estimated to be 0.00389.\n", "This means that amongst all possible pairwise record comparisons, one in 257.25 are expected to match. With 499,500 total possible comparisons, we expect a total of around 1,941.67 matching pairs\n" ] } ], "source": [ "from splink.spark.spark_linker import SparkLinker\n", "linker = SparkLinker(df, settings)\n", "deterministic_rules = [\n", " \"l.first_name = r.first_name and levenshtein(r.dob, l.dob) <= 1\",\n", " \"l.surname = r.surname and levenshtein(r.dob, l.dob) <= 1\",\n", " \"l.first_name = r.first_name and levenshtein(r.surname, l.surname) <= 2\",\n", " \"l.email = r.email\"\n", "]\n", "\n", "linker.estimate_probability_two_random_records_match(deterministic_rules, recall=0.6)\n" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "----- Estimating u probabilities using random sampling ----- \n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "22/09/19 14:39:14 WARN DataSource: All paths were ignored:\n", " file:/Users/robinlinacre/Documents/data_linking/splink_demos/tmp_checkpoints/06909547-db7a-49ae-a2d5-e913b503cb7c/__splink__df_concat_with_tf\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ " \n", "Estimated u probabilities using random sampling\n", "\n", "Your model is not yet fully trained. Missing estimates for:\n", " - first_name (no m values are trained).\n", " - surname (no m values are trained).\n", " - dob (no m values are trained).\n", " - city (no m values are trained).\n", " - email (no m values are trained).\n" ] } ], "source": [ "linker.estimate_u_using_random_sampling(target_rows=5e5)" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "\n", "----- Starting EM training session -----\n", "\n", "Estimating the m probabilities of the model by blocking on:\n", "l.first_name = r.first_name and l.surname = r.surname\n", "\n", "Parameter estimates will be made for the following comparison(s):\n", " - dob\n", " - city\n", " - email\n", "\n", "Parameter estimates cannot be made for the following comparison(s) since they are used in the blocking rules: \n", " - first_name\n", " - surname\n", " \n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "22/09/19 14:39:20 WARN DataSource: All paths were ignored:\n", " file:/Users/robinlinacre/Documents/data_linking/splink_demos/tmp_checkpoints/06909547-db7a-49ae-a2d5-e913b503cb7c/__splink__df_comparison_vectors_77ebe74\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "Iteration 1: Largest change in params was -0.53 in the m_probability of dob, level `Exact match`\n", "Iteration 2: Largest change in params was 0.0335 in probability_two_random_records_match\n", "Iteration 3: Largest change in params was 0.0129 in probability_two_random_records_match\n", "Iteration 4: Largest change in params was 0.00639 in probability_two_random_records_match\n", "\n", "EM converged after 4 iterations\n", "\n", "Your model is not yet fully trained. Missing estimates for:\n", " - first_name (no m values are trained).\n", " - surname (no m values are trained).\n", "\n", "----- Starting EM training session -----\n", "\n", "Estimating the m probabilities of the model by blocking on:\n", "l.dob = r.dob\n", "\n", "Parameter estimates will be made for the following comparison(s):\n", " - first_name\n", " - surname\n", " - city\n", " - email\n", "\n", "Parameter estimates cannot be made for the following comparison(s) since they are used in the blocking rules: \n", " - dob\n", " \n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "22/09/19 14:39:26 WARN DataSource: All paths were ignored:\n", " file:/Users/robinlinacre/Documents/data_linking/splink_demos/tmp_checkpoints/06909547-db7a-49ae-a2d5-e913b503cb7c/__splink__df_comparison_vectors_2bd95d3\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "Iteration 1: Largest change in params was -0.413 in the m_probability of surname, level `Exact match`\n", "Iteration 2: Largest change in params was 0.108 in probability_two_random_records_match\n", "Iteration 3: Largest change in params was 0.0348 in probability_two_random_records_match\n", "Iteration 4: Largest change in params was 0.0133 in probability_two_random_records_match\n", "Iteration 5: Largest change in params was 0.00593 in probability_two_random_records_match\n", "\n", "EM converged after 5 iterations\n", "\n", "Your model is fully trained. All comparisons have at least one estimate for their m and u values\n" ] } ], "source": [ "training_blocking_rule = \"l.first_name = r.first_name and l.surname = r.surname\"\n", "training_session_fname_sname = linker.estimate_parameters_using_expectation_maximisation(training_blocking_rule)\n", "\n", "training_blocking_rule = \"l.dob = r.dob\"\n", "training_session_dob = linker.estimate_parameters_using_expectation_maximisation(training_blocking_rule)" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "22/09/19 14:39:32 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.\n", "22/09/19 14:39:32 WARN DataSource: All paths were ignored:\n", " file:/Users/robinlinacre/Documents/data_linking/splink_demos/tmp_checkpoints/06909547-db7a-49ae-a2d5-e913b503cb7c/__splink__df_predict_99e9cb4\n" ] } ], "source": [ "results = linker.predict(threshold_match_probability=0.9)" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
match_weightmatch_probabilityunique_id_lunique_id_rfirst_name_lfirst_name_rgamma_first_namebf_first_namesurname_lsurname_r...gamma_citytf_city_ltf_city_rbf_citybf_tf_adj_cityemail_lemail_rgamma_emailbf_emailmatch_key
03.3717390.911904220223LoganLogan286.748396serguFonFerguson...10.2127920.21279210.3160020.259162l.feruson46@sahh.comNone-11.0000000
114.7434060.999964879880LeoLeo286.748396WebsterWebster...10.0086100.00861010.3160026.404996leo.webster54@moore.biezNone-11.0000000
213.1402660.999889446450AishaAisha286.748396BryantNone...00.0110700.0012300.4562591.000000aishab64@obrien-flores.comaishab64@obrien-flores.com3257.4589440
38.8291260.997806446448AishaAisha286.748396BryantBryBant...00.0110700.0012300.4562591.000000aishab64@obrien-flores.comaishab64@obrien-flores.com3257.4589440
46.5848440.989690790791JacksonJackson286.748396FisrehFishier...00.0098400.0012300.4562591.000000j.fisher4@sullivan.comNone-11.0000000
\n", "

5 rows × 28 columns

\n", "
" ], "text/plain": [ " match_weight match_probability unique_id_l unique_id_r first_name_l \\\n", "0 3.371739 0.911904 220 223 Logan \n", "1 14.743406 0.999964 879 880 Leo \n", "2 13.140266 0.999889 446 450 Aisha \n", "3 8.829126 0.997806 446 448 Aisha \n", "4 6.584844 0.989690 790 791 Jackson \n", "\n", " first_name_r gamma_first_name bf_first_name surname_l surname_r ... \\\n", "0 Logan 2 86.748396 serguFon Ferguson ... \n", "1 Leo 2 86.748396 Webster Webster ... \n", "2 Aisha 2 86.748396 Bryant None ... \n", "3 Aisha 2 86.748396 Bryant BryBant ... \n", "4 Jackson 2 86.748396 Fisreh Fishier ... \n", "\n", " gamma_city tf_city_l tf_city_r bf_city bf_tf_adj_city \\\n", "0 1 0.212792 0.212792 10.316002 0.259162 \n", "1 1 0.008610 0.008610 10.316002 6.404996 \n", "2 0 0.011070 0.001230 0.456259 1.000000 \n", "3 0 0.011070 0.001230 0.456259 1.000000 \n", "4 0 0.009840 0.001230 0.456259 1.000000 \n", "\n", " email_l email_r gamma_email \\\n", "0 l.feruson46@sahh.com None -1 \n", "1 leo.webster54@moore.biez None -1 \n", "2 aishab64@obrien-flores.com aishab64@obrien-flores.com 3 \n", "3 aishab64@obrien-flores.com aishab64@obrien-flores.com 3 \n", "4 j.fisher4@sullivan.com None -1 \n", "\n", " bf_email match_key \n", "0 1.000000 0 \n", "1 1.000000 0 \n", "2 257.458944 0 \n", "3 257.458944 0 \n", "4 1.000000 0 \n", "\n", "[5 rows x 28 columns]" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "results.as_pandas_dataframe(limit=5)" ] } ], "metadata": { "kernelspec": { "display_name": "splink_demos", "language": "python", "name": "splink_demos" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.3" }, "vscode": { "interpreter": { "hash": "3b53fa520a31e303a9636a08ff10a3bbc14893ee50cb37445791fa59628fc75b" } } }, "nbformat": 4, "nbformat_minor": 4 }