{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Basic commands" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "Sys.CPU_CORES " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "addprocs(Sys.CPU_CORES -1) # Number of processes added (e.g.: 3)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "nprocs() " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "nworkers()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "procs() " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "workers() " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# rmprocs(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## First approach" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Sending work" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "r1 = remotecall(rand, 2, 2, 2) # Function, id of the worker, args of the function." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Getting the result" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fetch(r1) # We get the value." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "r1" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "r1[2, 2]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# However, notice that r1 is still a future.\n", "# This implies that ordinary operation are not definied\n", "sum(r1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# For that, we have to save the output of the future.\n", "# You will see that we do this most of the times\n", "r2 = fetch(r1) " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sum(r2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Additional functions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "r3 = remotecall_fetch(rand, 2, 2, 2) # instead of fetch(remotecall())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sum(r3)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "r1 = remotecall(rand, 2, 2, 2)\n", "s1 = @spawnat 2 1 .+ fetch(r1) # id process, expression: 1 .+ fetch(r1)\n", "fetch(s1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s2 = @spawn rand(2, 2) # Notice that there is no \"at\", so we do not specify the worker.\n", "s3 = @spawn 1 .+ fetch(s2) # It is Julia who select it.\n", "fetch(s3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Using our own functions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "function eig_sum(A)\n", " autoVal, autoVec = eig(A);\n", " return sum(autoVal)\n", "end" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "eig_sum(rand(2, 2))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s1 = @spawnat 1 eig_sum(rand(2, 2))\n", "fetch(s1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s2 = @spawnat 2 eig_sum(rand(2, 2))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fetch(s2) # returns an error." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@everywhere function eig_sum(A) # Now all the processes know about the function.\n", " autoVal, autoVec = eig(A);\n", " return sum(autoVal)\n", "end" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s3 = @spawnat 2 eig_sum(rand(2, 2))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fetch(s3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Data movements" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### We must be cautious when working with global variables!!!" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "A = rand(2, 2)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "whos() # We see that A is created locally." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@spawnat 2 whos() # We see that there is nothing in the second process." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s3 = @spawnat 2 eig_sum(A) " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@spawnat 2 whos() " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "X = rand(4, 4) # We create a new Matrix X.\n", "whos() # We see how it is in the first process, but not in the second one." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@spawnat 2 whos()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "let B = X\n", " s3 = @spawnat 2 eig_sum(B)\n", "end" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@spawnat 2 whos()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Parallel map and loops" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "n = 200000000;\n", "nheads = @parallel (+) for i = 1:n\n", " Int(rand(Bool))\n", "end" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "piAprox = 0.0; # Recall, pi = 3.1415926...\n", "piAprox = @parallel (+) for i = 1:n\n", " Int(rand()^2 + rand()^2 <= 1);\n", "end\n", "piAprox /= n/4 # Equivalent to piAprox = piAprox*4/n." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Be careful!!!" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "a = zeros(100000);\n", "@parallel for i = 1:100000\n", " a[i] = i;\n", "end\n", "a # Nothing has changed." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### We can use our own functions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "n = 100000;\n", "x = rand(n);\n", "y = rand(n);" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@everywhere function inside(x, y) return Int(x^2 + y^2 <= 1) end" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "piAprox = 0.0;\n", "piAprox = @parallel (+) for i=1:n\n", " inside(x[i], y[i]);\n", "end\n", "\n", "piAprox /= n/4" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Comparing times" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Ordinary for loop.\n", "a = 0;\n", "@time for i = 1:20000\n", " a += Int(rand(Bool));\n", "end" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Predefined function.\n", "@time sum(rand(0:1, 20000))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Parallel for loop\n", "@time @parallel (+) for i = 1:20000\n", " Int(rand(Bool));\n", "end" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# For no so small amount of work:\n", "n = 200000000;\n", "@time @parallel (+) for i = 1:n\n", " Int(rand(Bool))\n", "end" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### pmap" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "M = Matrix{Float64}[rand(100, 100) for i = 1:5];" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pmap(svd, M); # Compute the svd for each of them." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Dynamic scheduling" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "M = Matrix{Float64}[rand(800, 800), rand(600, 600), rand(800, 800), rand(600, 600)];" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "function f_pmap(f, lst)\n", " np = nprocs() # Number of processes available.\n", " n = length(lst) # Number of elements to apply the function.\n", " results = Vector{Any}(n) # Where we will write the results. As we do not know\n", " # the type (Integer, Tuple...) we write \"Any\"\n", " i = 1\n", " nextidx() = (idx = i; i += 1; idx) # Function to know which is the next work item.\n", " # In this case it is just an index.\n", " @sync begin # See below the discussion about all this part.\n", " for p=1:np\n", " if p != myid() || np == 1\n", " @async begin\n", " while true\n", " idx = nextidx()\n", " if idx > n\n", " break\n", " end\n", " results[idx] = remotecall_fetch(f, p, lst[idx])\n", " end\n", " end\n", " end\n", " end\n", " end\n", " results\n", "end" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "result = f_pmap(svd, M);" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "result" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Communication" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Channels" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "const jobs = Channel{Int}(32); # Here we can save at maximum 32 integers.\n", "const results = Channel{Tuple}(32);" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "function do_work()\n", " for job_id in jobs\n", " exec_time = rand()\n", " sleep(exec_time) # Simulates elapsed time doing actual work.\n", " # Typically performed externally.\n", " put!(results, (job_id, exec_time)) # To write elements in a channel we \"put\" them.\n", " end\n", "end;" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "function make_jobs(n)\n", " for i in 1:n\n", " put!(jobs, i)\n", " end\n", "end;" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "n = 12;" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@schedule make_jobs(n); # Feed the jobs channel with \"n\" jobs." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "for i in 1:4 # Start 4 tasks to process requests in parallel.\n", " @schedule do_work()\n", "end" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@elapsed while n > 0 # Print out results.\n", " job_id, exec_time = take!(results) # To get elements from a channel we \"take\" them.\n", " println(\"$job_id finished in $(round(exec_time, 2)) seconds\")\n", " n = n - 1\n", "end" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Remote channels" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "const jobs = RemoteChannel(()->Channel{Int}(32));\n", "const results = RemoteChannel(()->Channel{Tuple}(32));" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@everywhere function do_work(jobs, results) # Define work function everywhere.\n", " while true\n", " job_id = take!(jobs)\n", " exec_time = rand()\n", " sleep(exec_time)\n", " put!(results, (job_id, exec_time, myid()))\n", " end\n", "end" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "function make_jobs(n)\n", " for i in 1:n\n", " put!(jobs, i)\n", " end\n", "end;" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "n = 12;\n", "\n", "@schedule make_jobs(n); # Feed the jobs channel with \"n\" jobs.\n", "\n", "for p in workers() # Start tasks on the workers to process requests in parallel.\n", " @async remote_do(do_work, p, jobs, results) # Similar to remotecall.\n", "end" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@elapsed while n > 0 # Print out results.\n", " job_id, exec_time, where = take!(results)\n", " println(\"$job_id finished in $(round(exec_time,2)) seconds on worker $where\")\n", " n = n - 1\n", "end" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Shared Arrays" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "S = SharedArray{Int, 2}((3, 4))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "S = SharedArray{Int, 2}((4, 4), init = S -> S[collect(1:5:16)] = 1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "S[3, 2] = 7" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "S" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Now we can modify arrays!!!" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "x = rand(0:5, 1000);\n", "c = rand(0:1, 1000);\n", "output = SharedArray{Int, 1}(1000);\n", "\n", "@parallel (+) for i = 1:1000\n", " output[i] = x[i]*c[i];\n", "end\n", "\n", "output # We have been able to modify the array." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Last example" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Including packages needed" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@everywhere using DataFrames # We will need all the process to use this type.\n", "using RDatasets" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Creating the input" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# We read the dataset and pick \"npoints\" to classify.\n", "df_data = dataset(\"datasets\", \"iris\");\n", "npoints = 10; # In this case we will classify only 10 points.\n", "sample = rand(1:size(df_data)[1], npoints);\n", "sample = unique(sample); # Just in case there are elements repeated." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# We save the characteristics of the points to classify in a new dataset \"df_clas\"\n", "# and delete them from the original one.\n", "df_clas = df_data[sample, :];\n", "deleterows!(df_data, sample);" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Creating auxiliary types and functions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@everywhere type Point\n", " d::Float64; # distance.\n", " g::AbstractString; # group.\n", "end" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# We need this function to sort Points.\n", "@everywhere function getdist(x::Point)\n", " return x.d\n", "end" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# We CAN NOT use a dot product because we have DataFrames NO arrays!\n", "# In the function, 'n' is the number of the columns in the dataset.\n", "@everywhere function distance(df_data::DataFrame, df_clas::DataFrame, n::Int)\n", " d = 0.0;\n", " for k = 1:(n-1)\n", " d += (df_data[1, k] - df_clas[1, k])^2\n", " end\n", " return d; # We return the square of the euclidean distance.\n", "end" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Main function" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@everywhere function knn(df_data::DataFrame, df_clas::DataFrame, K::Int)\n", " neighbors = Array{Point, 1}(K); # Where we save the K nearest neighbors.\n", "\n", " lastelem = ncol(df_data);\n", " dist = 0.0; # Variable to save the distances.\n", " maxdist = 0.0; # Variable to save the larger distance within array \"neighbors\".\n", " namegroup = \"\";\n", "\n", " vnames = Array{AbstractString, 1}(K); # Vector of names to make the final count.\n", " groups = unique(df_data[:, lastelem]); # We collect which are the groups.\n", " gcount = zeros(Int, length(groups)); # Number of times each group appears.\n", "\n", " # To keep this simple, we assume that nrow(df_data) > K.\n", " # We select the first K neighbors.\n", " for j = 1:K\n", " dist = distance(df_data[j, :], df_clas[1, :], lastelem);\n", " namegroup = df_data[j, lastelem];\n", " neighbors[j] = Point(dist, namegroup);\n", " end\n", "\n", " # We sort by distance and get the MAXIMUM value\n", " sort!(neighbors, by = getdist);\n", " maxdist = neighbors[K].d;\n", "\n", " # We compare with the rest of the points.\n", " for j = K:nrow(df_data)\n", " dist = distance(df_data[j, :], df_clas[1, :], lastelem);\n", " if dist < maxdist\n", " neighbors[K].d = dist;\n", " neighbors[K].g = df_data[j, lastelem];\n", " sort!(neighbors, by = getdist);\n", " maxdist = neighbors[K].d;\n", " end\n", " end\n", "\n", " # We classify the point.\n", " # First, we get the names of the groups in the array \"neighbors\".\n", " for j = 1:K\n", " vnames[j] = neighbors[j].g; \n", " end\n", "\n", " # Second, we count how many times appears each of them.\n", " for j = 1:length(groups)\n", " gcount[j] = count(s->(s == groups[j]), vnames);\n", " end\n", " pos = find(gcount .== maximum(gcount));\n", " pos = rand(pos, 1); # We sample vector pos in case of draws.\n", "\n", " # Third, we return the result.\n", " return groups[pos[1]];\n", "end" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Testing it (no parallel)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "result = Array{AbstractString, 1}(nrow(df_clas)); # Vector with the solution.\n", "@time for i = 1:nrow(df_clas)\n", " result[i] = knn(df_data, df_clas[i, :], 5); # We use K = 5.\n", "end\n", "result\n", "df_clas" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Parallel implementation" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "function knn_parallel(df_data::DataFrame, df_clas::DataFrame, counter::Int,\n", " output::Array{AbstractString, 1})\n", " @sync begin\n", " for p in workers()\n", " @async begin\n", " while true\n", " idx = counter - 1;\n", " counter -= 1; # Why do not we need a Shared Array?\n", " if idx <= 0\n", " break;\n", " end\n", " output[idx] = remotecall_fetch(knn, p, df_data, df_clas[idx,:], 5);\n", " end\n", " end\n", " end\n", " end\n", "end" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Input required.\n", "counter = nrow(df_clas) + 1;\n", "output = Array{AbstractString, 1}(nrow(df_clas));" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Function.\n", "@time knn_parallel(df_data, df_clas, counter, output)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# The result.\n", "output\n", "df_clas" ] } ], "metadata": { "kernelspec": { "display_name": "Julia 0.6.2", "language": "julia", "name": "julia-0.6" }, "language_info": { "file_extension": ".jl", "mimetype": "application/julia", "name": "julia", "version": "0.6.2" } }, "nbformat": 4, "nbformat_minor": 2 }