Distributed learning

MACS 30500 University of Chicago

Distributed computing

Structured Query Language (SQL)

Getting started with SQLite

Connecting to the database

library(dplyr)
my_db <- DBI::dbConnect(RSQLite::SQLite(), path = ":memory:")

Adding data to database

library(nycflights13)
copy_to(my_db,
        flights,
        temporary = FALSE,
        indexes = list(
          c("year", "month", "day"),
          "carrier",
          "tailnum"
        )
)
flights_db <- tbl(my_db, "flights")
flights_db
## # Source:   table<flights> [?? x 19]
## # Database: sqlite 3.22.0 []
##     year month   day dep_time sched_dep_time dep_delay arr_time
##    <int> <int> <int>    <int>          <int>     <dbl>    <int>
##  1  2013     1     1      517            515        2.      830
##  2  2013     1     1      533            529        4.      850
##  3  2013     1     1      542            540        2.      923
##  4  2013     1     1      544            545       -1.     1004
##  5  2013     1     1      554            600       -6.      812
##  6  2013     1     1      554            558       -4.      740
##  7  2013     1     1      555            600       -5.      913
##  8  2013     1     1      557            600       -3.      709
##  9  2013     1     1      557            600       -3.      838
## 10  2013     1     1      558            600       -2.      753
## # ... with more rows, and 12 more variables: sched_arr_time <int>,
## #   arr_delay <dbl>, carrier <chr>, flight <int>, tailnum <chr>,
## #   origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>,
## #   minute <dbl>, time_hour <dbl>

Basic verbs

select(flights_db, year:day, dep_delay, arr_delay)
## # Source:   lazy query [?? x 5]
## # Database: sqlite 3.19.3 []
##     year month   day dep_delay arr_delay
##    <int> <int> <int>     <dbl>     <dbl>
##  1  2013     1     1         2        11
##  2  2013     1     1         4        20
##  3  2013     1     1         2        33
##  4  2013     1     1        -1       -18
##  5  2013     1     1        -6       -25
##  6  2013     1     1        -4        12
##  7  2013     1     1        -5        19
##  8  2013     1     1        -3       -14
##  9  2013     1     1        -3        -8
## 10  2013     1     1        -2         8
## # ... with more rows
filter(flights_db, dep_delay > 240)
## # Source:   lazy query [?? x 19]
## # Database: sqlite 3.19.3 []
##     year month   day dep_time sched_dep_time dep_delay arr_time
##    <int> <int> <int>    <int>          <int>     <dbl>    <int>
##  1  2013     1     1      848           1835       853     1001
##  2  2013     1     1     1815           1325       290     2120
##  3  2013     1     1     1842           1422       260     1958
##  4  2013     1     1     2115           1700       255     2330
##  5  2013     1     1     2205           1720       285       46
##  6  2013     1     1     2343           1724       379      314
##  7  2013     1     2     1332            904       268     1616
##  8  2013     1     2     1412            838       334     1710
##  9  2013     1     2     1607           1030       337     2003
## 10  2013     1     2     2131           1512       379     2340
## # ... with more rows, and 12 more variables: sched_arr_time <int>,
## #   arr_delay <dbl>, carrier <chr>, flight <int>, tailnum <chr>,
## #   origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>,
## #   minute <dbl>, time_hour <dbl>
arrange(flights_db, year, month, day)
## # Source:     table<flights> [?? x 19]
## # Database:   sqlite 3.19.3 []
## # Ordered by: year, month, day
##     year month   day dep_time sched_dep_time dep_delay arr_time
##    <int> <int> <int>    <int>          <int>     <dbl>    <int>
##  1  2013     1     1      517            515         2      830
##  2  2013     1     1      533            529         4      850
##  3  2013     1     1      542            540         2      923
##  4  2013     1     1      544            545        -1     1004
##  5  2013     1     1      554            600        -6      812
##  6  2013     1     1      554            558        -4      740
##  7  2013     1     1      555            600        -5      913
##  8  2013     1     1      557            600        -3      709
##  9  2013     1     1      557            600        -3      838
## 10  2013     1     1      558            600        -2      753
## # ... with more rows, and 12 more variables: sched_arr_time <int>,
## #   arr_delay <dbl>, carrier <chr>, flight <int>, tailnum <chr>,
## #   origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>,
## #   minute <dbl>, time_hour <dbl>
mutate(flights_db, speed = air_time / distance)
## # Source:   lazy query [?? x 20]
## # Database: sqlite 3.19.3 []
##     year month   day dep_time sched_dep_time dep_delay arr_time
##    <int> <int> <int>    <int>          <int>     <dbl>    <int>
##  1  2013     1     1      517            515         2      830
##  2  2013     1     1      533            529         4      850
##  3  2013     1     1      542            540         2      923
##  4  2013     1     1      544            545        -1     1004
##  5  2013     1     1      554            600        -6      812
##  6  2013     1     1      554            558        -4      740
##  7  2013     1     1      555            600        -5      913
##  8  2013     1     1      557            600        -3      709
##  9  2013     1     1      557            600        -3      838
## 10  2013     1     1      558            600        -2      753
## # ... with more rows, and 13 more variables: sched_arr_time <int>,
## #   arr_delay <dbl>, carrier <chr>, flight <int>, tailnum <chr>,
## #   origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>,
## #   minute <dbl>, time_hour <dbl>, speed <dbl>
summarise(flights_db, delay = mean(dep_time))
## # Source:   lazy query [?? x 1]
## # Database: sqlite 3.19.3 []
##     delay
##     <dbl>
## 1 1349.11

Backend transformation of dplyr functions

select(flights_db, year:day, dep_delay, arr_delay) %>%
  show_query()
## <SQL>
## SELECT `year` AS `year`, `month` AS `month`, `day` AS `day`, `dep_delay` AS `dep_delay`, `arr_delay` AS `arr_delay`
## FROM `flights`

A lazy cat

The laziness of dbplyr

c1 <- filter(flights_db, year == 2013, month == 1, day == 1)
c2 <- select(c1, year, month, day, carrier, dep_delay, air_time, distance)
c3 <- mutate(c2, speed = distance / air_time * 60)
c4 <- arrange(c3, year, month, day, carrier)
c4
## # Source:     lazy query [?? x 8]
## # Database:   sqlite 3.19.3 []
## # Ordered by: year, month, day, carrier
##     year month   day carrier dep_delay air_time distance    speed
##    <int> <int> <int>   <chr>     <dbl>    <dbl>    <dbl>    <dbl>
##  1  2013     1     1      9E         0      189     1029 326.6667
##  2  2013     1     1      9E        -9       57      228 240.0000
##  3  2013     1     1      9E        -3       68      301 265.5882
##  4  2013     1     1      9E        -6       57      209 220.0000
##  5  2013     1     1      9E        -8       66      264 240.0000
##  6  2013     1     1      9E         0       40      184 276.0000
##  7  2013     1     1      9E         6      146      740 304.1096
##  8  2013     1     1      9E         0      139      665 287.0504
##  9  2013     1     1      9E        -8      150      765 306.0000
## 10  2013     1     1      9E        -6       41      187 273.6585
## # ... with more rows
collect(c4)
## # A tibble: 842 x 8
##     year month   day carrier dep_delay air_time distance    speed
##    <int> <int> <int>   <chr>     <dbl>    <dbl>    <dbl>    <dbl>
##  1  2013     1     1      9E         0      189     1029 326.6667
##  2  2013     1     1      9E        -9       57      228 240.0000
##  3  2013     1     1      9E        -3       68      301 265.5882
##  4  2013     1     1      9E        -6       57      209 220.0000
##  5  2013     1     1      9E        -8       66      264 240.0000
##  6  2013     1     1      9E         0       40      184 276.0000
##  7  2013     1     1      9E         6      146      740 304.1096
##  8  2013     1     1      9E         0      139      665 287.0504
##  9  2013     1     1      9E        -8      150      765 306.0000
## 10  2013     1     1      9E        -6       41      187 273.6585
## # ... with 832 more rows

Google Bigquery

  • Distributed cloud platform for data warehousing and analytics
  • Extremely efficient
  • Large capacity
  • Flexible pricing

Interacting with Google Bigquery via dplyr

library(bigrquery)

taxi <- DBI::dbConnect(bigrquery::bigquery(),
                       project = "nyc-tlc",
                       dataset = "yellow",
                       billing = getOption("bigquery_id"))
taxi
## <BigQueryConnection>
##   Dataset: nyc-tlc:yellow

In 2014, how many trips per taken each month in yellow cabs?

SELECT
  LEFT(STRING(pickup_datetime), 7) month,
  COUNT(*) trips
FROM
  [nyc-tlc:yellow.trips]
WHERE
  YEAR(pickup_datetime) = 2014
GROUP BY
  1
ORDER BY
  1
system.time({
  trips_by_month <- taxi %>%
    tbl("trips") %>%
    filter(year(pickup_datetime) == 2014) %>%
    mutate(month = month(pickup_datetime)) %>%
    count(month) %>%
    arrange(month) %>%
    collect()
})
##    user  system elapsed 
##   0.291   0.015   4.491
trips_by_month
## # A tibble: 12 x 2
##    month        n
##    <int>    <int>
##  1     1 13782492
##  2     2 13063791
##  3     3 15428127
##  4     4 14618759
##  5     5 14774041
##  6     6 13813029
##  7     7 13106365
##  8     8 12688877
##  9     9 13374016
## 10    10 14232487
## 11    11 13218216
## 12    12 13014161

What is the average speed per hour of day in yellow cabs?

system.time({
  speed_per_hour <- taxi %>%
    tbl("trips") %>%
    mutate(hour = hour(pickup_datetime),
           trip_duration = (dropoff_datetime - pickup_datetime) /
             3600000000) %>%
    mutate(speed = trip_distance / trip_duration) %>%
    filter(fare_amount / trip_distance >= 2,
           fare_amount / trip_distance <= 10) %>%
    group_by(hour) %>%
    summarize(speed = mean(speed)) %>%
    arrange(hour) %>%
    collect()
})
##    user  system elapsed 
##   0.227   0.010   4.329
ggplot(speed_per_hour, aes(hour, speed)) +
  geom_line() +
  labs(title = "Average Speed of NYC Yellow Taxis",
       x = "Hour of day",
       y = "Average speed, in MPH")

What is the average speed by day of the week?

system.time({
  speed_per_day <- taxi %>%
    tbl("trips") %>%
    mutate(hour = hour(pickup_datetime),
           day = dayofweek(pickup_datetime),
           trip_duration = (dropoff_datetime - pickup_datetime) /
             3600000000) %>%
    mutate(speed = trip_distance / trip_duration) %>%
    filter(fare_amount / trip_distance >= 2,
           fare_amount / trip_distance <= 10,
           hour >= 8,
           hour <= 18) %>%
    group_by(day) %>%
    summarize(speed = mean(speed)) %>%
    arrange(day) %>%
    collect()
})
##    user  system elapsed 
##   0.145   0.006   4.688
speed_per_day
## # A tibble: 7 x 2
##     day    speed
##   <int>    <dbl>
## 1     1 14.29703
## 2     2 12.21557
## 3     3 11.11933
## 4     4 10.93281
## 5     5 10.97011
## 6     6 11.24917
## 7     7 13.09473

Split-apply-combine

  • Split data into pieces
  • Apply some function to each piece
  • Combine the results back together again

dplyr::group_by()

gapminder %>%
  group_by(continent) %>%
  summarize(n = n())
## # A tibble: 5 x 2
##   continent     n
##      <fctr> <int>
## 1    Africa   624
## 2  Americas   300
## 3      Asia   396
## 4    Europe   360
## 5   Oceania    24
gapminder %>%
  group_by(continent) %>%
  summarize(avg_lifeExp = mean(lifeExp))
## # A tibble: 5 x 2
##   continent avg_lifeExp
##      <fctr>       <dbl>
## 1    Africa    48.86533
## 2  Americas    64.65874
## 3      Asia    60.06490
## 4    Europe    71.90369
## 5   Oceania    74.32621

for loops

countries <- unique(gapminder$country)
lifeExp_models <- vector("list", length(countries))
names(lifeExp_models) <- countries

for(i in seq_along(countries)){
  lifeExp_models[[i]] <- lm(lifeExp ~ year,
                            data = filter(gapminder,
                                          country == countries[[i]]))
}
head(lifeExp_models)
## $Afghanistan
## 
## Call:
## lm(formula = lifeExp ~ year, data = filter(gapminder, country == 
##     countries[[i]]))
## 
## Coefficients:
## (Intercept)         year  
##   -507.5343       0.2753  
## 
## 
## $Albania
## 
## Call:
## lm(formula = lifeExp ~ year, data = filter(gapminder, country == 
##     countries[[i]]))
## 
## Coefficients:
## (Intercept)         year  
##   -594.0725       0.3347  
## 
## 
## $Algeria
## 
## Call:
## lm(formula = lifeExp ~ year, data = filter(gapminder, country == 
##     countries[[i]]))
## 
## Coefficients:
## (Intercept)         year  
##  -1067.8590       0.5693  
## 
## 
## $Angola
## 
## Call:
## lm(formula = lifeExp ~ year, data = filter(gapminder, country == 
##     countries[[i]]))
## 
## Coefficients:
## (Intercept)         year  
##   -376.5048       0.2093  
## 
## 
## $Argentina
## 
## Call:
## lm(formula = lifeExp ~ year, data = filter(gapminder, country == 
##     countries[[i]]))
## 
## Coefficients:
## (Intercept)         year  
##   -389.6063       0.2317  
## 
## 
## $Australia
## 
## Call:
## lm(formula = lifeExp ~ year, data = filter(gapminder, country == 
##     countries[[i]]))
## 
## Coefficients:
## (Intercept)         year  
##   -376.1163       0.2277

nest() and map()

# function to estimate linear model for gapminder subsets
le_vs_yr <- function(df) {
  lm(lifeExp ~ year, data = df)
}

# split data into nests
(gap_nested <- gapminder %>% 
   group_by(continent, country) %>% 
   nest())
## # A tibble: 142 x 3
##    continent     country              data
##       <fctr>      <fctr>            <list>
##  1      Asia Afghanistan <tibble [12 x 4]>
##  2    Europe     Albania <tibble [12 x 4]>
##  3    Africa     Algeria <tibble [12 x 4]>
##  4    Africa      Angola <tibble [12 x 4]>
##  5  Americas   Argentina <tibble [12 x 4]>
##  6   Oceania   Australia <tibble [12 x 4]>
##  7    Europe     Austria <tibble [12 x 4]>
##  8      Asia     Bahrain <tibble [12 x 4]>
##  9      Asia  Bangladesh <tibble [12 x 4]>
## 10    Europe     Belgium <tibble [12 x 4]>
## # ... with 132 more rows
# apply a linear model to each nested data frame
(gap_nested <- gap_nested %>% 
   mutate(fit = map(data, le_vs_yr)))
## # A tibble: 142 x 4
##    continent     country              data      fit
##       <fctr>      <fctr>            <list>   <list>
##  1      Asia Afghanistan <tibble [12 x 4]> <S3: lm>
##  2    Europe     Albania <tibble [12 x 4]> <S3: lm>
##  3    Africa     Algeria <tibble [12 x 4]> <S3: lm>
##  4    Africa      Angola <tibble [12 x 4]> <S3: lm>
##  5  Americas   Argentina <tibble [12 x 4]> <S3: lm>
##  6   Oceania   Australia <tibble [12 x 4]> <S3: lm>
##  7    Europe     Austria <tibble [12 x 4]> <S3: lm>
##  8      Asia     Bahrain <tibble [12 x 4]> <S3: lm>
##  9      Asia  Bangladesh <tibble [12 x 4]> <S3: lm>
## 10    Europe     Belgium <tibble [12 x 4]> <S3: lm>
## # ... with 132 more rows
# combine the results back into a single data frame
library(broom)
(gap_nested <- gap_nested %>% 
  mutate(tidy = map(fit, tidy)))
## # A tibble: 142 x 5
##    continent     country              data      fit                 tidy
##       <fctr>      <fctr>            <list>   <list>               <list>
##  1      Asia Afghanistan <tibble [12 x 4]> <S3: lm> <data.frame [2 x 5]>
##  2    Europe     Albania <tibble [12 x 4]> <S3: lm> <data.frame [2 x 5]>
##  3    Africa     Algeria <tibble [12 x 4]> <S3: lm> <data.frame [2 x 5]>
##  4    Africa      Angola <tibble [12 x 4]> <S3: lm> <data.frame [2 x 5]>
##  5  Americas   Argentina <tibble [12 x 4]> <S3: lm> <data.frame [2 x 5]>
##  6   Oceania   Australia <tibble [12 x 4]> <S3: lm> <data.frame [2 x 5]>
##  7    Europe     Austria <tibble [12 x 4]> <S3: lm> <data.frame [2 x 5]>
##  8      Asia     Bahrain <tibble [12 x 4]> <S3: lm> <data.frame [2 x 5]>
##  9      Asia  Bangladesh <tibble [12 x 4]> <S3: lm> <data.frame [2 x 5]>
## 10    Europe     Belgium <tibble [12 x 4]> <S3: lm> <data.frame [2 x 5]>
## # ... with 132 more rows
(gap_coefs <- gap_nested %>% 
   select(continent, country, tidy) %>% 
   unnest(tidy))
## # A tibble: 284 x 7
##    continent     country        term      estimate    std.error  statistic
##       <fctr>      <fctr>       <chr>         <dbl>        <dbl>      <dbl>
##  1      Asia Afghanistan (Intercept)  -507.5342716 40.484161954 -12.536613
##  2      Asia Afghanistan        year     0.2753287  0.020450934  13.462890
##  3    Europe     Albania (Intercept)  -594.0725110 65.655359062  -9.048348
##  4    Europe     Albania        year     0.3346832  0.033166387  10.091036
##  5    Africa     Algeria (Intercept) -1067.8590396 43.802200843 -24.379118
##  6    Africa     Algeria        year     0.5692797  0.022127070  25.727749
##  7    Africa      Angola (Intercept)  -376.5047531 46.583370599  -8.082385
##  8    Africa      Angola        year     0.2093399  0.023532003   8.895964
##  9  Americas   Argentina (Intercept)  -389.6063445  9.677729641 -40.258031
## 10  Americas   Argentina        year     0.2317084  0.004888791  47.395847
## # ... with 274 more rows, and 1 more variables: p.value <dbl>

Parallel computing

From An introduction to parallel programming using Python’s multiprocessing module

Parallel computing

  • Parallel processing
  • Serial processing
  • Multithreading

Why use parallel computing

  • Imitates real life
  • More efficient (maybe)
  • Tackle larger problems
  • Distributed resources

Why not to use parallel computing

Amdahl’s Law from Wikipedia

Why not to use parallel computing

  • Limits to efficiency gains
  • Complexity
  • Dependencies
  • Parallel slowdown

multidplyr

  1. partition() to split the dataset across multiple cores
  2. Apply dplyr verb to a party df
  3. collect() the data

nycflights13::flights

library(multidplyr)
library(nycflights13)
flights1 <- partition(flights, flight)
flights2 <- summarize(flights1, dep_delay = mean(dep_delay, na.rm = TRUE))
flights3 <- collect(flights2)
flights2
## Source: party_df [3,844 x 2]
## Shards: 3 [1,269--1,306 rows]
## 
## # S3: party_df
##    flight  dep_delay
##     <int>      <dbl>
##  1      2 -0.5686275
##  2      3  3.6650794
##  3      4  7.5166240
##  4      6  8.5024155
##  5      8  6.9358974
##  6     10 24.3114754
##  7     12 28.2834646
##  8     15 10.2643080
##  9     20  3.7053571
## 10     22 20.2528736
## # ... with 3,834 more rows

Performance of multidplyr

system.time({
  flights %>% 
    partition() %>%
    summarise(mean(dep_delay, na.rm = TRUE)) %>% 
    collect()
})
##    user  system elapsed 
##   0.442   0.058   0.978
system.time({
  flights %>% 
    group_by() %>%
    summarise(mean(dep_delay, na.rm = TRUE))
})
##    user  system elapsed 
##   0.007   0.000   0.006

gapminder

Parallel processing

# split data into nests
gap_nested <- gapminder %>% 
  group_by(continent, country) %>% 
  nest()

# partition gap_nested across the cores
gap_nested_part <- gap_nested %>%
  partition(country)

# apply a linear model to each nested data frame
cluster_library(gap_nested_part, "purrr")

system.time({
  gap_nested_part %>% 
    mutate(fit = map(data, function(df) lm(lifeExp ~ year, data = df)))
})
##    user  system elapsed 
##   0.001   0.000   0.106

Serial processing

system.time({
  gap_nested %>% 
    mutate(fit = map(data, function(df) lm(lifeExp ~ year, data = df)))
})
##    user  system elapsed 
##   0.106   0.003   0.112

Hadoop and Spark

  • Open-source software library for distributed processing of large data sets
  • Typically across clusters of computers
  • Highly scalable
  • Modules
    • Hadoop Distributed File System (HDFS)
    • Hadoop MapReduce
    • Spark

sparklyr

  • Connect to Spark from R using the dplyr interface
  • Interact with SQL databases stored on a Spark cluster
  • Implement distributed statistical learning algorithms

Installation

install.packages("sparklyr")
library(sparklyr)
spark_install(version = "2.1.0")

Connecting to Spark

library(sparklyr)
sc <- spark_connect(master = "local")

Reading data

install.packages("babynames")
library(babynames)
babynames_tbl <- copy_to(sc, babynames, "babynames", overwrite = TRUE)
applicants_tbl <- copy_to(sc, applicants, "applicants", overwrite = TRUE)

babynames_tbl
## # Source:   table<babynames> [?? x 5]
## # Database: spark_connection
##     year sex   name          n   prop
##    <dbl> <chr> <chr>     <int>  <dbl>
##  1 1880. F     Mary       7065 0.0724
##  2 1880. F     Anna       2604 0.0267
##  3 1880. F     Emma       2003 0.0205
##  4 1880. F     Elizabeth  1939 0.0199
##  5 1880. F     Minnie     1746 0.0179
##  6 1880. F     Margaret   1578 0.0162
##  7 1880. F     Ida        1472 0.0151
##  8 1880. F     Alice      1414 0.0145
##  9 1880. F     Bertha     1320 0.0135
## 10 1880. F     Sarah      1288 0.0132
## # ... with more rows
applicants_tbl
## # Source:   table<applicants> [?? x 3]
## # Database: spark_connection
##     year sex    n_all
##    <int> <chr>  <int>
##  1  1880 F      97604
##  2  1880 M     118399
##  3  1881 F      98855
##  4  1881 M     108282
##  5  1882 F     115696
##  6  1882 M     122031
##  7  1883 F     120059
##  8  1883 M     112478
##  9  1884 F     137586
## 10  1884 M     122739
## # ... with more rows

Using dplyr

birthsYearly <- applicants_tbl %>%
  mutate(sex = if_else(sex == "M", "male", "female"),
         n_all = n_all / 1000000) %>%
  collect()

ggplot(birthsYearly, aes(year, n_all, fill = sex)) +
  geom_area(position = "stack") +
  scale_fill_brewer(type = "qual") +
  labs(title = "Total US Births",
       y = "Millions",
       fill = NULL,
       caption = "Source: SSA")

Compared to non-DB dplyr

birthsYearly <- applicants %>%
  mutate(sex = if_else(sex == "M", "male", "female"),
         n_all = n_all / 1000000)

ggplot(birthsYearly, aes(year, n_all, fill = sex)) +
  geom_area(position = "stack") +
  scale_fill_brewer(type = "qual") +
  labs(title = "Total US Births",
       y = "Millions",
       fill = NULL,
       caption = "Source: SSA")

Lookup table

(topNames_tbl <- babynames_tbl %>%
  filter(year >= 1986) %>%  
  group_by(name, sex) %>%
  summarize(count = as.numeric(sum(n))) %>%
  filter(count > 1000) %>%
  select(name, sex))
## # Source:   lazy query [?? x 2]
## # Database: spark_connection
## # Groups:   name
##         name   sex
##        <chr> <chr>
##  1   Jessica     F
##  2    Ashley     F
##  3  Jennifer     F
##  4    Nicole     F
##  5   Heather     F
##  6 Elizabeth     F
##  7     Megan     F
##  8   Melissa     F
##  9  Danielle     F
## 10       Amy     F
## # ... with more rows
(filteredNames_tbl <- babynames_tbl %>%
  filter(year >= 1986) %>%
  inner_join(topNames_tbl))
## # Source:   lazy query [?? x 5]
## # Database: spark_connection
##     year   sex  name     n         prop
##    <dbl> <chr> <chr> <int>        <dbl>
##  1  1998     F Aanya     5 2.580377e-06
##  2  1999     F Aanya     5 2.569595e-06
##  3  2000     F Aanya     7 3.509720e-06
##  4  2001     F Aanya    20 1.010320e-05
##  5  2002     F Aanya    20 1.013399e-05
##  6  2003     F Aanya    24 1.197048e-05
##  7  2004     F Aanya    39 1.934463e-05
##  8  2005     F Aanya    39 1.923544e-05
##  9  2006     F Aanya    92 4.405580e-05
## 10  2007     F Aanya   125 5.913224e-05
## # ... with more rows
(yearlyNames_tbl <- filteredNames_tbl %>%
  group_by(year, name, sex) %>%
  summarize(count = as.numeric(sum(n))))
## # Source:   lazy query [?? x 4]
## # Database: spark_connection
## # Groups:   year, name
##     year     name   sex count
##    <dbl>    <chr> <chr> <dbl>
##  1  1986  Abagail     F    23
##  2  1986     Abbi     F    37
##  3  1987     Abbi     F    40
##  4  1986 Abbygail     F     7
##  5  2002  Addelyn     F    12
##  6  1987    Addie     F   101
##  7  1987  Addison     F    43
##  8  1995  Addisyn     F     5
##  9  1996  Addisyn     F     6
## 10  1990  Addyson     F     5
## # ... with more rows
sdf_register(yearlyNames_tbl, "yearlyNames")
## # Source:   table<yearlyNames> [?? x 4]
## # Database: spark_connection
##     year     name   sex count
##    <dbl>    <chr> <chr> <dbl>
##  1  1986  Abagail     F    23
##  2  1986     Abbi     F    37
##  3  1987     Abbi     F    40
##  4  1986 Abbygail     F     7
##  5  2002  Addelyn     F    12
##  6  1987    Addie     F   101
##  7  1987  Addison     F    43
##  8  1995  Addisyn     F     5
##  9  1996  Addisyn     F     6
## 10  1990  Addyson     F     5
## # ... with more rows

Using a lookup table

topNames1986_tbl <- yearlyNames_tbl %>%
  filter(year == 1986) %>%
  group_by(name, sex) %>%
  summarize(count = sum(count)) %>%
  group_by(sex) %>%
  mutate(rank = min_rank(desc(count))) %>%
  filter(rank < 5) %>%
  arrange(sex, rank) %>%
  select(name, sex, rank) %>%
  sdf_register("topNames1986")

topNames1986Yearly <- yearlyNames_tbl %>%
  inner_join(select(topNames1986_tbl, sex, name)) %>%
  mutate(sex = if_else(sex == "M", "Male", "Female")) %>%
  collect()

ggplot(topNames1986Yearly, aes(year, count, color = name)) +
  facet_grid(~ sex) +
  geom_line() +
  scale_color_brewer(type = "qual") +
  labs(title = "Most Popular Names of 1986",
       x = "Year",
       y = "Number of children born",
       caption = "Source: SSA")

Using a lookup table

topNames2014_tbl <- yearlyNames_tbl %>%
  filter(year == 2014) %>%
  group_by(name, sex) %>%
  summarize(count = sum(count)) %>%
  group_by(sex) %>%
  mutate(rank = min_rank(desc(count))) %>%
  filter(rank < 5) %>%
  arrange(sex, rank) %>%
  select(name, sex, rank) %>%
  sdf_register("topNames2014")

topNames2014Yearly <- yearlyNames_tbl %>%
  inner_join(select(topNames2014_tbl, sex, name)) %>%
  mutate(sex = if_else(sex == "M", "Male", "Female")) %>%
  collect()

ggplot(topNames2014Yearly, aes(year, count, color = name)) +
  facet_grid(~ sex) +
  geom_line() +
  scale_color_brewer(type = "qual") +
  labs(title = "Most Popular Names of 2014",
       x = "Year",
       y = "Number of children born",
       caption = "Source: SSA")

Machine learning with Spark

  • caret::train()
  • sparklyr::ml_()

Load the data

library(titanic)
(titanic_tbl <- copy_to(sc, titanic::titanic_train, "titanic", overwrite = TRUE))
## # Source:   table<titanic> [?? x 12]
## # Database: spark_connection
##    PassengerId Survived Pclass Name   Sex     Age SibSp Parch Ticket  Fare
##          <int>    <int>  <int> <chr>  <chr> <dbl> <int> <int> <chr>  <dbl>
##  1           1        0      3 Braun… male    22.     1     0 A/5 2…  7.25
##  2           2        1      1 Cumin… fema…   38.     1     0 PC 17… 71.3 
##  3           3        1      3 Heikk… fema…   26.     0     0 STON/…  7.92
##  4           4        1      1 Futre… fema…   35.     1     0 113803 53.1 
##  5           5        0      3 Allen… male    35.     0     0 373450  8.05
##  6           6        0      3 Moran… male   NaN      0     0 330877  8.46
##  7           7        0      1 McCar… male    54.     0     0 17463  51.9 
##  8           8        0      3 Palss… male     2.     3     1 349909 21.1 
##  9           9        1      3 Johns… fema…   27.     0     2 347742 11.1 
## 10          10        1      2 Nasse… fema…   14.     1     0 237736 30.1 
## # ... with more rows, and 2 more variables: Cabin <chr>, Embarked <chr>

Tidy the data

titanic2_tbl <- titanic_tbl %>% 
  mutate(Family_Size = SibSp + Parch + 1L) %>% 
  mutate(Pclass = as.character(Pclass)) %>%
  filter(!is.na(Embarked)) %>%
  mutate(Age = if_else(is.na(Age), mean(Age), Age)) %>%
  sdf_register("titanic2")

Spark ML transforms

titanic_final_tbl <- titanic2_tbl %>%
  mutate(Family_Size = as.numeric(Family_size)) %>%
  sdf_mutate(
    Family_Sizes = ft_bucketizer(Family_Size, splits = c(1,2,5,12))
    ) %>%
  mutate(Family_Sizes = as.character(as.integer(Family_Sizes))) %>%
  sdf_register("titanic_final")

Train-validation split

# Partition the data
partition <- titanic_final_tbl %>% 
  mutate(Survived = as.numeric(Survived),
         SibSp = as.numeric(SibSp),
         Parch = as.numeric(Parch)) %>%
  select(Survived, Pclass, Sex, Age, SibSp, Parch, Fare, Embarked, Family_Sizes) %>%
  sdf_partition(train = 0.75, test = 0.25, seed = 1234)

# Create table references
train_tbl <- partition$train
test_tbl <- partition$test

Train the models

# Model survival as a function of several predictors
ml_formula <- formula(Survived ~ Pclass + Sex + Age + SibSp +
                        Parch + Fare + Embarked + Family_Sizes)

# Train a logistic regression model
(ml_log <- ml_logistic_regression(train_tbl, ml_formula))
## Call: Survived ~ Pclass_2 + Pclass_3 + Sex_male + Age + SibSp + Parch + Fare + Embarked_Q + Embarked_S + Family_Sizes_1 + Family_Sizes_2
## 
## Coefficients:
##    (Intercept)       Pclass_2       Pclass_3       Sex_male            Age 
##    3.696483969   -1.102340720   -2.053772367   -2.786065613   -0.035430315 
##          SibSp          Parch           Fare     Embarked_Q     Embarked_S 
##    0.158164571    0.457385475    0.002038998    0.214551594   -0.159123177 
## Family_Sizes_1 Family_Sizes_2 
##   -0.306388170   -3.749909900
# Decision Tree
ml_dt <- ml_decision_tree(train_tbl, ml_formula)

# Random Forest
ml_rf <- ml_random_forest(train_tbl, ml_formula)

# Gradient Boosted Tree
ml_gbt <- ml_gradient_boosted_trees(train_tbl, ml_formula)

# Naive Bayes
ml_nb <- ml_naive_bayes(train_tbl, ml_formula)

# Neural Network
ml_nn <- ml_multilayer_perceptron(train_tbl, ml_formula, layers = c(11, 15, 2))

Validate the models

# Bundle the models into a single list object
ml_models <- list(
  "Logistic" = ml_log,
  "Decision Tree" = ml_dt,
  "Random Forest" = ml_rf,
  "Gradient Boosted Trees" = ml_gbt,
  "Naive Bayes" = ml_nb,
  "Neural Net" = ml_nn
)

# Create a function for scoring
score_test_data <- function(model, data = test_tbl){
  pred <- sdf_predict(model, data)
  select(pred, Survived, prediction)
}

# Score all the models
ml_score <- map(ml_models, score_test_data)

Model lift

# Lift function
calculate_lift <- function(scored_data) {
  scored_data %>%
    mutate(bin = ntile(desc(prediction), 10)) %>% 
    group_by(bin) %>% 
    summarize(count = sum(Survived)) %>% 
    mutate(prop = count / sum(count)) %>% 
    arrange(bin) %>% 
    mutate(prop = cumsum(prop)) %>% 
    select(-count) %>% 
    collect() %>% 
    as.data.frame()
}

# Initialize results
ml_gains <- data_frame(
  bin = 1:10,
  prop = seq(0, 1, len = 10),
  model = "Base"
)

# Calculate lift
for(i in names(ml_score)){
  ml_gains <- ml_score[[i]] %>%
    calculate_lift %>%
    mutate(model = i) %>%
    bind_rows(ml_gains, .)
}

# Plot results
ggplot(ml_gains, aes(x = bin, y = prop, color = model)) +
  geom_point() +
  geom_line() +
  scale_color_brewer(type = "qual") +
  labs(title = "Lift Chart for Predicting Survival",
       subtitle = "Test Data Set",
       x = NULL,
       y = NULL)

Receiver operating characteristic (ROC) curves

From Receiver operating characteristic

Receiver operating characteristic (ROC) curves

From Receiver operating characteristic

Calculating AUC

# Function for calculating accuracy
calc_accuracy <- function(data, cutpoint = 0.5){
  data %>% 
    mutate(prediction = if_else(prediction > cutpoint, 1.0, 0.0)) %>%
    ml_classification_eval("prediction", "Survived", "accuracy")
}

# Calculate AUC and accuracy
perf_metrics <- data_frame(
  model = names(ml_score),
  AUC = 100 * map_dbl(ml_score, ml_binary_classification_eval, "Survived", "prediction"),
  Accuracy = 100 * map_dbl(ml_score, calc_accuracy)
  )
perf_metrics
## # A tibble: 6 x 3
##                    model      AUC Accuracy
##                    <chr>    <dbl>    <dbl>
## 1               Logistic 81.00928 82.43902
## 2          Decision Tree 87.78392 79.51220
## 3          Random Forest 86.90500 82.43902
## 4 Gradient Boosted Trees 85.89769 79.51220
## 5            Naive Bayes 66.47738 69.26829
## 6             Neural Net 79.75509 80.48780
# Plot results
gather(perf_metrics, metric, value, AUC, Accuracy) %>%
  ggplot(aes(reorder(model, value), value, fill = metric)) + 
  geom_bar(stat = "identity", position = "dodge") + 
  coord_flip() +
  labs(title = "Performance metrics",
       x = NULL,
       y = "Percent")

Feature importance

# Initialize results
feature_importance <- data_frame()

# Calculate feature importance
for(i in c("Decision Tree", "Random Forest", "Gradient Boosted Trees")){
  feature_importance <- ml_tree_feature_importance(sc, ml_models[[i]]) %>%
    mutate(Model = i) %>%
    mutate(importance = as.numeric(levels(importance))[importance]) %>%
    mutate(feature = as.character(feature)) %>%
    rbind(feature_importance, .)
}

# Plot results
feature_importance %>%
  ggplot(aes(reorder(feature, importance), importance, fill = Model)) + 
  facet_wrap(~Model) +
  geom_bar(stat = "identity") + 
  coord_flip() +
  labs(title = "Feature importance",
       x = NULL) +
  theme(legend.position = "none")

Compare run times

Sparkling Water (H2O)

  • Limitations of MLlib (in sparklyr)
  • H2O
  • rsparkling

Prep data for H2O

library(rsparkling)
library(h2o)
titanic_h2o <- titanic_final_tbl %>% 
  mutate(Survived = as.numeric(Survived),
         SibSp = as.numeric(SibSp),
         Parch = as.numeric(Parch)) %>%
  select(Survived, Pclass, Sex, Age, SibSp, Parch, Fare, Embarked, Family_Sizes) %>%
  as_h2o_frame(sc, ., strict_version_check = FALSE)

Estimate logistic regression with H2O

glm_model <- h2o.glm(x = c("Pclass", "Sex", "Age", "SibSp", "Parch",
                           "Fare", "Embarked", "Family_Sizes"), 
                     y = "Survived",
                     family = "binomial",
                     training_frame = titanic_h2o,
                     lambda_search = TRUE,
                     nfolds = 10)
## 
  |                                                                       
  |                                                                 |   0%
  |                                                                       
  |==============                                                   |  22%
  |                                                                       
  |=======================                                          |  35%
  |                                                                       
  |=================================================================| 100%
glm_model
## Model Details:
## ==============
## 
## H2OBinomialModel: glm
## Model ID:  GLM_model_R_1509986513909_1 
## GLM Model: summary
##     family  link                                regularization
## 1 binomial logit Elastic Net (alpha = 0.5, lambda = 0.005469 )
##                                                                    lambda_search
## 1 nlambda = 100, lambda.max = 0.248, lambda.min = 0.005469, lambda.1se = 0.04647
##   number_of_predictors_total number_of_active_predictors
## 1                          4                           4
##   number_of_iterations                                   training_frame
## 1                   64 frame_rdd_13563_b525994613c77b1f2f5a43f14e2f420e
## 
## Coefficients: glm coefficients
##       names coefficients standardized_coefficients
## 1 Intercept    -0.255193                 -0.456986
## 2       Age    -0.021331                 -0.276632
## 3     SibSp    -0.271432                 -0.299581
## 4     Parch     0.094055                  0.075880
## 5      Fare     0.016725                  0.831198
## 
## H2OBinomialMetrics: glm
## ** Reported on training data. **
## 
## MSE:  0.2089208
## RMSE:  0.4570785
## LogLoss:  0.6125085
## Mean Per-Class Error:  0.3636157
## AUC:  0.7077119
## Gini:  0.4154238
## R^2:  0.1154266
## Residual Deviance:  1089.04
## AIC:  1099.04
## 
## Confusion Matrix (vertical: actual; across: predicted) for F1-optimal threshold:
##          0   1    Error      Rate
## 0      287 262 0.477231  =262/549
## 1       85 255 0.250000   =85/340
## Totals 372 517 0.390326  =347/889
## 
## Maximum Metrics: Maximum metrics at their respective thresholds
##                         metric threshold    value idx
## 1                       max f1  0.324016 0.595099 270
## 2                       max f2  0.181186 0.759607 394
## 3                 max f0point5  0.397161 0.646825 167
## 4                 max accuracy  0.397161 0.725534 167
## 5                max precision  0.999495 1.000000   0
## 6                   max recall  0.181186 1.000000 394
## 7              max specificity  0.999495 1.000000   0
## 8             max absolute_mcc  0.397161 0.396588 167
## 9   max min_per_class_accuracy  0.346634 0.635294 235
## 10 max mean_per_class_accuracy  0.397161 0.678686 167
## 
## Gains/Lift Table: Extract with `h2o.gainsLift(<model>, <data>)` or `h2o.gainsLift(<model>, valid=<T/F>, xval=<T/F>)`
## 
## H2OBinomialMetrics: glm
## ** Reported on cross-validation data. **
## ** 10-fold cross-validation on training data (Metrics computed for combined holdout predictions) **
## 
## MSE:  0.2118124
## RMSE:  0.4602309
## LogLoss:  0.6206943
## Mean Per-Class Error:  0.3758732
## AUC:  0.6961481
## Gini:  0.3922962
## R^2:  0.1031832
## Residual Deviance:  1103.594
## AIC:  1113.594
## 
## Confusion Matrix (vertical: actual; across: predicted) for F1-optimal threshold:
##          0   1    Error      Rate
## 0      280 269 0.489982  =269/549
## 1       89 251 0.261765   =89/340
## Totals 369 520 0.402700  =358/889
## 
## Maximum Metrics: Maximum metrics at their respective thresholds
##                         metric threshold    value idx
## 1                       max f1  0.325858 0.583721 263
## 2                       max f2  0.241754 0.759436 365
## 3                 max f0point5  0.409853 0.631443 157
## 4                 max accuracy  0.409853 0.716535 157
## 5                max precision  0.999812 1.000000   0
## 6                   max recall  0.171814 1.000000 392
## 7              max specificity  0.999812 1.000000   0
## 8             max absolute_mcc  0.409853 0.374206 157
## 9   max min_per_class_accuracy  0.347850 0.635294 231
## 10 max mean_per_class_accuracy  0.388387 0.670414 178
## 
## Gains/Lift Table: Extract with `h2o.gainsLift(<model>, <data>)` or `h2o.gainsLift(<model>, valid=<T/F>, xval=<T/F>)`
## Cross-Validation Metrics Summary: 
##                 mean          sd cv_1_valid cv_2_valid cv_3_valid
## accuracy   0.6857912  0.07408625  0.6826923  0.7373737  0.7777778
## auc        0.7040641 0.028921101  0.7515361  0.7008547   0.722973
## err       0.31420884  0.07408625 0.31730768 0.26262626 0.22222222
## err_count       27.6    5.685068       33.0       26.0       22.0
## f0point5   0.6299649  0.06931518  0.6132075 0.67484665   0.729927
##           cv_4_valid cv_5_valid cv_6_valid cv_7_valid cv_8_valid
## accuracy   0.7777778     0.7625  0.4868421 0.48809522 0.74418604
## auc        0.7071339 0.73145604  0.6047585 0.66211826  0.7140056
## err       0.22222222     0.2375  0.5131579  0.5119048 0.25581396
## err_count       18.0       19.0       39.0       43.0       22.0
## f0point5  0.77868855 0.65789473  0.5032467  0.4397394  0.7037037
##           cv_9_valid cv_10_valid
## accuracy   0.6703297   0.7303371
## auc         0.744324  0.70148027
## err       0.32967034  0.26966292
## err_count       30.0        24.0
## f0point5   0.5733945       0.625
## 
## ---
##                          mean          sd  cv_1_valid cv_2_valid
## precision           0.6365017  0.10264663   0.5652174  0.7096774
## r2                0.098378904  0.04136454 0.069770396 0.14930138
## recall              0.7115725  0.12688743   0.9285714  0.5641026
## residual_deviance   109.37238   7.7664247   135.47588  117.30765
## rmse               0.46011594 0.009886858  0.47324085 0.45067203
## specificity         0.6730586  0.19214197    0.516129       0.85
##                   cv_3_valid cv_4_valid  cv_5_valid  cv_6_valid
## precision                0.8  0.8636364   0.6451613  0.44927537
## r2                 0.1549875 0.15600768 0.063523754 0.013918524
## recall             0.5405405  0.5588235  0.71428573     0.96875
## residual_deviance  115.59444   96.23355    98.76699   102.43924
## rmse              0.44472656 0.45339072   0.4615716  0.49027994
## specificity       0.91935486  0.9361702  0.78846157  0.13636364
##                    cv_7_valid cv_8_valid cv_9_valid cv_10_valid
## precision           0.3857143       0.76 0.54347825  0.64285713
## r2                0.016913304 0.13284658 0.17517667 0.051343292
## recall                    1.0 0.54285717  0.7352941      0.5625
## residual_deviance   104.35602  103.97689  105.57458  113.998604
## rmse                0.4630586 0.45747635  0.4393554  0.46738735
## specificity        0.24561404 0.88235295  0.6315789   0.8245614