library(tidyverse)
library(gapminder)
library(stringr)
set.seed(1234)
theme_set(theme_minimal())
A common analytical pattern is to
We have used this technique many times so far without explicitly identifying it as such.
dplyr::group_by()
gapminder %>%
group_by(continent) %>%
summarize(n = n())
## # A tibble: 5 x 2
## continent n
## <fctr> <int>
## 1 Africa 624
## 2 Americas 300
## 3 Asia 396
## 4 Europe 360
## 5 Oceania 24
gapminder %>%
group_by(continent) %>%
summarize(avg_lifeExp = mean(lifeExp))
## # A tibble: 5 x 2
## continent avg_lifeExp
## <fctr> <dbl>
## 1 Africa 48.86533
## 2 Americas 64.65874
## 3 Asia 60.06490
## 4 Europe 71.90369
## 5 Oceania 74.32621
for
loopscountries <- unique(gapminder$country)
lifeExp_models <- vector("list", length(countries))
names(lifeExp_models) <- countries
for(i in seq_along(countries)){
lifeExp_models[[i]] <- lm(lifeExp ~ year,
data = filter(gapminder,
country == countries[[i]]))
}
head(lifeExp_models)
## $Afghanistan
##
## Call:
## lm(formula = lifeExp ~ year, data = filter(gapminder, country ==
## countries[[i]]))
##
## Coefficients:
## (Intercept) year
## -507.5343 0.2753
##
##
## $Albania
##
## Call:
## lm(formula = lifeExp ~ year, data = filter(gapminder, country ==
## countries[[i]]))
##
## Coefficients:
## (Intercept) year
## -594.0725 0.3347
##
##
## $Algeria
##
## Call:
## lm(formula = lifeExp ~ year, data = filter(gapminder, country ==
## countries[[i]]))
##
## Coefficients:
## (Intercept) year
## -1067.8590 0.5693
##
##
## $Angola
##
## Call:
## lm(formula = lifeExp ~ year, data = filter(gapminder, country ==
## countries[[i]]))
##
## Coefficients:
## (Intercept) year
## -376.5048 0.2093
##
##
## $Argentina
##
## Call:
## lm(formula = lifeExp ~ year, data = filter(gapminder, country ==
## countries[[i]]))
##
## Coefficients:
## (Intercept) year
## -389.6063 0.2317
##
##
## $Australia
##
## Call:
## lm(formula = lifeExp ~ year, data = filter(gapminder, country ==
## countries[[i]]))
##
## Coefficients:
## (Intercept) year
## -376.1163 0.2277
nest()
and map()
# function to estimate linear model for gapminder subsets
le_vs_yr <- function(df) {
lm(lifeExp ~ year, data = df)
}
# split data into nests
(gap_nested <- gapminder %>%
group_by(continent, country) %>%
nest())
## # A tibble: 142 x 3
## continent country data
## <fctr> <fctr> <list>
## 1 Asia Afghanistan <tibble [12 x 4]>
## 2 Europe Albania <tibble [12 x 4]>
## 3 Africa Algeria <tibble [12 x 4]>
## 4 Africa Angola <tibble [12 x 4]>
## 5 Americas Argentina <tibble [12 x 4]>
## 6 Oceania Australia <tibble [12 x 4]>
## 7 Europe Austria <tibble [12 x 4]>
## 8 Asia Bahrain <tibble [12 x 4]>
## 9 Asia Bangladesh <tibble [12 x 4]>
## 10 Europe Belgium <tibble [12 x 4]>
## # ... with 132 more rows
# apply a linear model to each nested data frame
(gap_nested <- gap_nested %>%
mutate(fit = map(data, le_vs_yr)))
## # A tibble: 142 x 4
## continent country data fit
## <fctr> <fctr> <list> <list>
## 1 Asia Afghanistan <tibble [12 x 4]> <S3: lm>
## 2 Europe Albania <tibble [12 x 4]> <S3: lm>
## 3 Africa Algeria <tibble [12 x 4]> <S3: lm>
## 4 Africa Angola <tibble [12 x 4]> <S3: lm>
## 5 Americas Argentina <tibble [12 x 4]> <S3: lm>
## 6 Oceania Australia <tibble [12 x 4]> <S3: lm>
## 7 Europe Austria <tibble [12 x 4]> <S3: lm>
## 8 Asia Bahrain <tibble [12 x 4]> <S3: lm>
## 9 Asia Bangladesh <tibble [12 x 4]> <S3: lm>
## 10 Europe Belgium <tibble [12 x 4]> <S3: lm>
## # ... with 132 more rows
# combine the results back into a single data frame
library(broom)
(gap_nested <- gap_nested %>%
mutate(tidy = map(fit, tidy)))
## # A tibble: 142 x 5
## continent country data fit tidy
## <fctr> <fctr> <list> <list> <list>
## 1 Asia Afghanistan <tibble [12 x 4]> <S3: lm> <data.frame [2 x 5]>
## 2 Europe Albania <tibble [12 x 4]> <S3: lm> <data.frame [2 x 5]>
## 3 Africa Algeria <tibble [12 x 4]> <S3: lm> <data.frame [2 x 5]>
## 4 Africa Angola <tibble [12 x 4]> <S3: lm> <data.frame [2 x 5]>
## 5 Americas Argentina <tibble [12 x 4]> <S3: lm> <data.frame [2 x 5]>
## 6 Oceania Australia <tibble [12 x 4]> <S3: lm> <data.frame [2 x 5]>
## 7 Europe Austria <tibble [12 x 4]> <S3: lm> <data.frame [2 x 5]>
## 8 Asia Bahrain <tibble [12 x 4]> <S3: lm> <data.frame [2 x 5]>
## 9 Asia Bangladesh <tibble [12 x 4]> <S3: lm> <data.frame [2 x 5]>
## 10 Europe Belgium <tibble [12 x 4]> <S3: lm> <data.frame [2 x 5]>
## # ... with 132 more rows
(gap_coefs <- gap_nested %>%
select(continent, country, tidy) %>%
unnest(tidy))
## # A tibble: 284 x 7
## continent country term estimate std.error statistic
## <fctr> <fctr> <chr> <dbl> <dbl> <dbl>
## 1 Asia Afghanistan (Intercept) -507.5342716 40.484161954 -12.536613
## 2 Asia Afghanistan year 0.2753287 0.020450934 13.462890
## 3 Europe Albania (Intercept) -594.0725110 65.655359062 -9.048348
## 4 Europe Albania year 0.3346832 0.033166387 10.091036
## 5 Africa Algeria (Intercept) -1067.8590396 43.802200843 -24.379118
## 6 Africa Algeria year 0.5692797 0.022127070 25.727749
## 7 Africa Angola (Intercept) -376.5047531 46.583370599 -8.082385
## 8 Africa Angola year 0.2093399 0.023532003 8.895964
## 9 Americas Argentina (Intercept) -389.6063445 9.677729641 -40.258031
## 10 Americas Argentina year 0.2317084 0.004888791 47.395847
## # ... with 274 more rows, and 1 more variables: p.value <dbl>
Parallel computing (or processing) is a type of computation whereby many calculations or processes are carried out simultaneously.1 Rather than processing problems in serial (or sequential) order, the computer splits the task up into smaller parts that can be processed simultaneously using multiple processors. This is also called multithreading. By spliting the job up into simultaneous operations running in parallel, you complete your operation quicker, making the code more efficient. This approach works great with split-apply-combine because all the applied operations can be run independently. Why wait for the first chunk to complete if you can perform the operation on the second chunk at the same time?
Limits to efficiency gains - Amdahl’s law defines theoretical limits to how much you can speed up computations via parallel computing. Because of this, you achieve diminishing returns over time.
Parallel slowdown - parallel computing speeds up computations at a price. Once the problem is broken into separate threads, reading and writing data from the threads to memory or the hard drive takes time. Some tasks are not improved by spliting the process into parallel operations.
multidplyr
multidplyr
is a work-in-progress package that implements parallel computing locally using dplyr
. Rather than performing computations using a single core or processor, it spreads the computation across multiple cores. The basic sequence of steps is:
partition()
to split the dataset across multiple cores. This makes a partitioned data frame, or a party df
for short.dplyr
verb applied to a party df
performs the operation independently on each core. It leaves each result on each core, and returns another party df
.collect()
to retrieve the data and bring it back to you local computer.nycflights13::flights
Install multidplyr
if you don’t have it already.
devtools::install_github("hadley/multidplyr")
library(multidplyr)
library(nycflights13)
Next, partition the flights data by flight number, compute the average delay per flight, and then collect the results:
flights1 <- partition(flights, flight)
flights2 <- summarize(flights1, dep_delay = mean(dep_delay, na.rm = TRUE))
flights3 <- collect(flights2)
The dplyr
code looks the same as usual, but behind the scenes things are very different. flights1
and flights2
are party df
s. These look like normal data frames, but have an additional attribute: the number of shards. In this example, it tells us that flights2
is spread across three nodes, and the size on each node varies from 1275 to 1286 rows. partition()
always makes sure a group is kept together on one node.
flights2
## Source: party_df [3,844 x 2]
## Shards: 3 [1,237--1,304 rows]
##
## # S3: party_df
## flight dep_delay
## <int> <dbl>
## 1 2 -0.5686275
## 2 3 3.6650794
## 3 4 7.5166240
## 4 6 8.5024155
## 5 8 6.9358974
## 6 10 24.3114754
## 7 11 6.8242991
## 8 12 28.2834646
## 9 15 10.2643080
## 10 19 10.0500000
## # ... with 3,834 more rows
For this size of data, using a local cluster actually makes performance slower.
system.time({
flights %>%
partition() %>%
summarise(mean(dep_delay, na.rm = TRUE)) %>%
collect()
})
## user system elapsed
## 0.474 0.066 0.968
system.time({
flights %>%
group_by() %>%
summarise(mean(dep_delay, na.rm = TRUE))
})
## user system elapsed
## 0.007 0.000 0.006
That’s because there’s some overhead associated with sending the data to each node and retrieving the results at the end. For basic dplyr
verbs, multidplyr
is unlikely to give you significant speed ups unless you have 10s or 100s of millions of data points. It might however, if you’re doing more complex things.
gapminder
Let’s now return to gapminder
and estimate separate linear regression models of life expectancy based on year for each country. We will use multidplyr
to split the work across multiple cores. Note that we need to use cluster_library()
to load the purrr
package on every node.
# split data into nests
gap_nested <- gapminder %>%
group_by(continent, country) %>%
nest()
# partition gap_nested across the cores
gap_nested_part <- gap_nested %>%
partition(country)
# apply a linear model to each nested data frame
cluster_library(gap_nested_part, "purrr")
system.time({
gap_nested_part %>%
mutate(fit = map(data, function(df) lm(lifeExp ~ year, data = df)))
})
## user system elapsed
## 0.002 0.000 0.113
Compared to how long running it locally?
system.time({
gap_nested %>%
mutate(fit = map(data, function(df) lm(lifeExp ~ year, data = df)))
})
## user system elapsed
## 0.109 0.002 0.112
So it’s roughly 2 times faster to run in parallel. Admittedly you saved only a fraction of a second. In relative terms this is great, but in absolute terms it doesn’t mean much. This demonstrates it doesn’t always make sense to parallelize operations - only do so if you can make significant gains in computation speed. If each country had thousands of observations, the efficiency gains would have been more dramatic.
devtools::session_info()
## setting value
## version R version 3.4.3 (2017-11-30)
## system x86_64, darwin15.6.0
## ui X11
## language (EN)
## collate en_US.UTF-8
## tz America/Chicago
## date 2018-04-24
##
## package * version date source
## assertthat 0.2.0 2017-04-11 CRAN (R 3.4.0)
## backports 1.1.2 2017-12-13 CRAN (R 3.4.3)
## base * 3.4.3 2017-12-07 local
## bindr 0.1.1 2018-03-13 CRAN (R 3.4.3)
## bindrcpp 0.2.2.9000 2018-04-08 Github (krlmlr/bindrcpp@bd5ae73)
## broom 0.4.4 2018-03-29 CRAN (R 3.4.3)
## cellranger 1.1.0 2016-07-27 CRAN (R 3.4.0)
## cli 1.0.0 2017-11-05 CRAN (R 3.4.2)
## colorspace 1.3-2 2016-12-14 CRAN (R 3.4.0)
## compiler 3.4.3 2017-12-07 local
## crayon 1.3.4 2017-10-03 Github (gaborcsardi/crayon@b5221ab)
## datasets * 3.4.3 2017-12-07 local
## devtools 1.13.5 2018-02-18 CRAN (R 3.4.3)
## digest 0.6.15 2018-01-28 CRAN (R 3.4.3)
## dplyr * 0.7.4.9003 2018-04-08 Github (tidyverse/dplyr@b7aaa95)
## evaluate 0.10.1 2017-06-24 CRAN (R 3.4.1)
## forcats * 0.3.0 2018-02-19 CRAN (R 3.4.3)
## foreign 0.8-69 2017-06-22 CRAN (R 3.4.3)
## gapminder * 0.3.0 2017-10-31 CRAN (R 3.4.2)
## ggplot2 * 2.2.1.9000 2018-04-24 Github (tidyverse/ggplot2@3c9c504)
## glue 1.2.0 2017-10-29 CRAN (R 3.4.2)
## graphics * 3.4.3 2017-12-07 local
## grDevices * 3.4.3 2017-12-07 local
## grid 3.4.3 2017-12-07 local
## gtable 0.2.0 2016-02-26 CRAN (R 3.4.0)
## haven 1.1.1 2018-01-18 CRAN (R 3.4.3)
## hms 0.4.2 2018-03-10 CRAN (R 3.4.3)
## htmltools 0.3.6 2017-04-28 CRAN (R 3.4.0)
## httr 1.3.1 2017-08-20 CRAN (R 3.4.1)
## jsonlite 1.5 2017-06-01 CRAN (R 3.4.0)
## knitr 1.20 2018-02-20 CRAN (R 3.4.3)
## lattice 0.20-35 2017-03-25 CRAN (R 3.4.3)
## lazyeval 0.2.1 2017-10-29 CRAN (R 3.4.2)
## lubridate 1.7.4 2018-04-11 CRAN (R 3.4.3)
## magrittr 1.5 2014-11-22 CRAN (R 3.4.0)
## memoise 1.1.0 2017-04-21 CRAN (R 3.4.0)
## methods * 3.4.3 2017-12-07 local
## mnormt 1.5-5 2016-10-15 CRAN (R 3.4.0)
## modelr 0.1.1 2017-08-10 local
## munsell 0.4.3 2016-02-13 CRAN (R 3.4.0)
## nlme 3.1-137 2018-04-07 CRAN (R 3.4.4)
## parallel 3.4.3 2017-12-07 local
## pillar 1.2.1 2018-02-27 CRAN (R 3.4.3)
## pkgconfig 2.0.1 2017-03-21 CRAN (R 3.4.0)
## plyr 1.8.4 2016-06-08 CRAN (R 3.4.0)
## psych 1.8.3.3 2018-03-30 CRAN (R 3.4.4)
## purrr * 0.2.4 2017-10-18 CRAN (R 3.4.2)
## R6 2.2.2 2017-06-17 CRAN (R 3.4.0)
## Rcpp 0.12.16 2018-03-13 CRAN (R 3.4.4)
## readr * 1.1.1 2017-05-16 CRAN (R 3.4.0)
## readxl 1.0.0 2017-04-18 CRAN (R 3.4.0)
## reshape2 1.4.3 2017-12-11 CRAN (R 3.4.3)
## rlang 0.2.0.9001 2018-04-24 Github (r-lib/rlang@82b2727)
## rmarkdown 1.9 2018-03-01 CRAN (R 3.4.3)
## rprojroot 1.3-2 2018-01-03 CRAN (R 3.4.3)
## rstudioapi 0.7 2017-09-07 CRAN (R 3.4.1)
## rvest 0.3.2 2016-06-17 CRAN (R 3.4.0)
## scales 0.5.0.9000 2018-04-24 Github (hadley/scales@d767915)
## stats * 3.4.3 2017-12-07 local
## stringi 1.1.7 2018-03-12 CRAN (R 3.4.3)
## stringr * 1.3.0 2018-02-19 CRAN (R 3.4.3)
## tibble * 1.4.2 2018-01-22 CRAN (R 3.4.3)
## tidyr * 0.8.0 2018-01-29 CRAN (R 3.4.3)
## tidyselect 0.2.4 2018-02-26 CRAN (R 3.4.3)
## tidyverse * 1.2.1 2017-11-14 CRAN (R 3.4.2)
## tools 3.4.3 2017-12-07 local
## utils * 3.4.3 2017-12-07 local
## withr 2.1.2 2018-04-24 Github (jimhester/withr@79d7b0d)
## xml2 1.2.0 2018-01-24 CRAN (R 3.4.3)
## yaml 2.1.18 2018-03-08 CRAN (R 3.4.4)
This work is licensed under the CC BY-NC 4.0 Creative Commons License.