## Basic commands

In [None]:
Sys.CPU_CORES 

In [None]:
addprocs(Sys.CPU_CORES -1) # Number of processes added (e.g.: 3).

In [None]:
nprocs() 

In [None]:
nworkers()

In [None]:
procs() 

In [None]:
workers() 

In [None]:
# rmprocs(2)

## First approach

### Sending work

In [None]:
r1 = remotecall(rand, 2, 2, 2) # Function, id of the worker, args of the function.

### Getting the result

In [None]:
fetch(r1) # We get the value.

In [None]:
r1

In [None]:
r1[2, 2]

In [None]:
# However, notice that r1 is still a future.
# This implies that ordinary operation are not definied
sum(r1)

In [None]:
# For that, we have to save the output of the future.
# You will see that we do this most of the times
r2 = fetch(r1) 

In [None]:
sum(r2)

### Additional functions

In [None]:
r3 = remotecall_fetch(rand, 2, 2, 2) # instead of fetch(remotecall())

In [None]:
sum(r3)

In [None]:
r1 = remotecall(rand, 2, 2, 2)
s1 = @spawnat 2 1 .+ fetch(r1) # id process, expression: 1 .+ fetch(r1)
fetch(s1)

In [None]:
s2 = @spawn rand(2, 2) # Notice that there is no "at", so we do not specify the worker.
s3 = @spawn 1 .+ fetch(s2) # It is Julia who select it.
fetch(s3)

### Using our own functions

In [None]:
function eig_sum(A)
 autoVal, autoVec = eig(A);
 return sum(autoVal)
end

In [None]:
eig_sum(rand(2, 2))

In [None]:
s1 = @spawnat 1 eig_sum(rand(2, 2))
fetch(s1)

In [None]:
s2 = @spawnat 2 eig_sum(rand(2, 2))

In [None]:
fetch(s2) # returns an error.

In [None]:
@everywhere function eig_sum(A) # Now all the processes know about the function.
 autoVal, autoVec = eig(A);
 return sum(autoVal)
end

In [None]:
s3 = @spawnat 2 eig_sum(rand(2, 2))

In [None]:
fetch(s3)

## Data movements

#### We must be cautious when working with global variables!!!

In [None]:
A = rand(2, 2)

In [None]:
whos() # We see that A is created locally.

In [None]:
@spawnat 2 whos() # We see that there is nothing in the second process.

In [None]:
s3 = @spawnat 2 eig_sum(A) 

In [None]:
@spawnat 2 whos() 

In [None]:
X = rand(4, 4) # We create a new Matrix X.
whos() # We see how it is in the first process, but not in the second one.

In [None]:
@spawnat 2 whos()

In [None]:
let B = X
 s3 = @spawnat 2 eig_sum(B)
end

In [None]:
@spawnat 2 whos()

## Parallel map and loops

In [None]:
n = 200000000;
nheads = @parallel (+) for i = 1:n
 Int(rand(Bool))
end

In [None]:
piAprox = 0.0; # Recall, pi = 3.1415926...
piAprox = @parallel (+) for i = 1:n
 Int(rand()^2 + rand()^2 <= 1);
end
piAprox /= n/4 # Equivalent to piAprox = piAprox*4/n.

### Be careful!!!

In [None]:
a = zeros(100000);
@parallel for i = 1:100000
 a[i] = i;
end
a # Nothing has changed.

### We can use our own functions

In [None]:
n = 100000;
x = rand(n);
y = rand(n);

In [None]:
@everywhere function inside(x, y) return Int(x^2 + y^2 <= 1) end

In [None]:
piAprox = 0.0;
piAprox = @parallel (+) for i=1:n
 inside(x[i], y[i]);
end

piAprox /= n/4

### Comparing times

In [None]:
# Ordinary for loop.
a = 0;
@time for i = 1:20000
 a += Int(rand(Bool));
end

In [None]:
# Predefined function.
@time sum(rand(0:1, 20000))

In [None]:
# Parallel for loop
@time @parallel (+) for i = 1:20000
 Int(rand(Bool));
end

In [None]:
# For no so small amount of work:
n = 200000000;
@time @parallel (+) for i = 1:n
 Int(rand(Bool))
end

### pmap

In [None]:
M = Matrix{Float64}[rand(100, 100) for i = 1:5];

In [None]:
pmap(svd, M); # Compute the svd for each of them.

## Dynamic scheduling

In [None]:
M = Matrix{Float64}[rand(800, 800), rand(600, 600), rand(800, 800), rand(600, 600)];

In [None]:
function f_pmap(f, lst)
 np = nprocs() # Number of processes available.
 n = length(lst) # Number of elements to apply the function.
 results = Vector{Any}(n) # Where we will write the results. As we do not know
 # the type (Integer, Tuple...) we write "Any"
 i = 1
 nextidx() = (idx = i; i += 1; idx) # Function to know which is the next work item.
 # In this case it is just an index.
 @sync begin # See below the discussion about all this part.
 for p=1:np
 if p != myid() || np == 1
 @async begin
 while true
 idx = nextidx()
 if idx > n
 break
 end
 results[idx] = remotecall_fetch(f, p, lst[idx])
 end
 end
 end
 end
 end
 results
end

In [None]:
result = f_pmap(svd, M);

In [None]:
result

## Communication

### Channels

In [None]:
const jobs = Channel{Int}(32); # Here we can save at maximum 32 integers.
const results = Channel{Tuple}(32);

In [None]:
function do_work()
 for job_id in jobs
 exec_time = rand()
 sleep(exec_time) # Simulates elapsed time doing actual work.
 # Typically performed externally.
 put!(results, (job_id, exec_time)) # To write elements in a channel we "put" them.
 end
end;

In [None]:
function make_jobs(n)
 for i in 1:n
 put!(jobs, i)
 end
end;

In [None]:
n = 12;

In [None]:
@schedule make_jobs(n); # Feed the jobs channel with "n" jobs.

In [None]:
for i in 1:4 # Start 4 tasks to process requests in parallel.
 @schedule do_work()
end

In [None]:
@elapsed while n > 0 # Print out results.
 job_id, exec_time = take!(results) # To get elements from a channel we "take" them.
 println("$job_id finished in $(round(exec_time, 2)) seconds")
 n = n - 1
end

### Remote channels

In [None]:
const jobs = RemoteChannel(()->Channel{Int}(32));
const results = RemoteChannel(()->Channel{Tuple}(32));

In [None]:
@everywhere function do_work(jobs, results) # Define work function everywhere.
 while true
 job_id = take!(jobs)
 exec_time = rand()
 sleep(exec_time)
 put!(results, (job_id, exec_time, myid()))
 end
end

In [None]:
function make_jobs(n)
 for i in 1:n
 put!(jobs, i)
 end
end;

In [None]:
n = 12;

@schedule make_jobs(n); # Feed the jobs channel with "n" jobs.

for p in workers() # Start tasks on the workers to process requests in parallel.
 @async remote_do(do_work, p, jobs, results) # Similar to remotecall.
end

In [None]:
@elapsed while n > 0 # Print out results.
 job_id, exec_time, where = take!(results)
 println("$job_id finished in $(round(exec_time,2)) seconds on worker $where")
 n = n - 1
end

## Shared Arrays

In [None]:
S = SharedArray{Int, 2}((3, 4))

In [None]:
S = SharedArray{Int, 2}((4, 4), init = S -> S[collect(1:5:16)] = 1)

In [None]:
S[3, 2] = 7

In [None]:
S

### Now we can modify arrays!!!

In [None]:
x = rand(0:5, 1000);
c = rand(0:1, 1000);
output = SharedArray{Int, 1}(1000);

@parallel (+) for i = 1:1000
 output[i] = x[i]*c[i];
end

output # We have been able to modify the array.

## Last example

### Including packages needed

In [None]:
@everywhere using DataFrames # We will need all the process to use this type.
using RDatasets

### Creating the input

In [None]:
# We read the dataset and pick "npoints" to classify.
df_data = dataset("datasets", "iris");
npoints = 10; # In this case we will classify only 10 points.
sample = rand(1:size(df_data)[1], npoints);
sample = unique(sample); # Just in case there are elements repeated.

In [None]:
# We save the characteristics of the points to classify in a new dataset "df_clas"
# and delete them from the original one.
df_clas = df_data[sample, :];
deleterows!(df_data, sample);

### Creating auxiliary types and functions

In [None]:
@everywhere type Point
 d::Float64; # distance.
 g::AbstractString; # group.
end

In [None]:
# We need this function to sort Points.
@everywhere function getdist(x::Point)
 return x.d
end

In [None]:
# We CAN NOT use a dot product because we have DataFrames NO arrays!
# In the function, 'n' is the number of the columns in the dataset.
@everywhere function distance(df_data::DataFrame, df_clas::DataFrame, n::Int)
 d = 0.0;
 for k = 1:(n-1)
 d += (df_data[1, k] - df_clas[1, k])^2
 end
 return d; # We return the square of the euclidean distance.
end

### Main function

In [None]:
@everywhere function knn(df_data::DataFrame, df_clas::DataFrame, K::Int)
 neighbors = Array{Point, 1}(K); # Where we save the K nearest neighbors.

 lastelem = ncol(df_data);
 dist = 0.0; # Variable to save the distances.
 maxdist = 0.0; # Variable to save the larger distance within array "neighbors".
 namegroup = "";

 vnames = Array{AbstractString, 1}(K); # Vector of names to make the final count.
 groups = unique(df_data[:, lastelem]); # We collect which are the groups.
 gcount = zeros(Int, length(groups)); # Number of times each group appears.

 # To keep this simple, we assume that nrow(df_data) > K.
 # We select the first K neighbors.
 for j = 1:K
 dist = distance(df_data[j, :], df_clas[1, :], lastelem);
 namegroup = df_data[j, lastelem];
 neighbors[j] = Point(dist, namegroup);
 end

 # We sort by distance and get the MAXIMUM value
 sort!(neighbors, by = getdist);
 maxdist = neighbors[K].d;

 # We compare with the rest of the points.
 for j = K:nrow(df_data)
 dist = distance(df_data[j, :], df_clas[1, :], lastelem);
 if dist < maxdist
 neighbors[K].d = dist;
 neighbors[K].g = df_data[j, lastelem];
 sort!(neighbors, by = getdist);
 maxdist = neighbors[K].d;
 end
 end

 # We classify the point.
 # First, we get the names of the groups in the array "neighbors".
 for j = 1:K
 vnames[j] = neighbors[j].g; 
 end

 # Second, we count how many times appears each of them.
 for j = 1:length(groups)
 gcount[j] = count(s->(s == groups[j]), vnames);
 end
 pos = find(gcount .== maximum(gcount));
 pos = rand(pos, 1); # We sample vector pos in case of draws.

 # Third, we return the result.
 return groups[pos[1]];
end

### Testing it (no parallel)

In [None]:
result = Array{AbstractString, 1}(nrow(df_clas)); # Vector with the solution.
@time for i = 1:nrow(df_clas)
 result[i] = knn(df_data, df_clas[i, :], 5); # We use K = 5.
end
result
df_clas

### Parallel implementation

In [None]:
function knn_parallel(df_data::DataFrame, df_clas::DataFrame, counter::Int,
 output::Array{AbstractString, 1})
 @sync begin
 for p in workers()
 @async begin
 while true
 idx = counter - 1;
 counter -= 1; # Why do not we need a Shared Array?
 if idx <= 0
 break;
 end
 output[idx] = remotecall_fetch(knn, p, df_data, df_clas[idx,:], 5);
 end
 end
 end
 end
end

In [None]:
# Input required.
counter = nrow(df_clas) + 1;
output = Array{AbstractString, 1}(nrow(df_clas));

In [None]:
# Function.
@time knn_parallel(df_data, df_clas, counter, output)

In [None]:
# The result.
output
df_clas