|
| 1 | +# Using SharedArrays in a parallel mapreduce |
| 2 | + |
| 3 | +One might want to carry out a computation across several nodes of a cluster, where each node works on its own shared array. This may be achieved by using a `WorkerPool` that consists of one worker per node, which acts as a root process launching tasks on that node, and eventually returning the local array for an overall reduction across nodes. |
| 4 | + |
| 5 | +We walk through one such example where we concatenate arrays that are locally initialized on each node. |
| 6 | + |
| 7 | +We load the packages necessary, in this case these are `ParallelUtilities`, `SharedArrays` and `Distributed`. |
| 8 | + |
| 9 | +```julia |
| 10 | +using ParallelUtilities |
| 11 | +using SharedArrays |
| 12 | +using Distributed |
| 13 | +``` |
| 14 | + |
| 15 | +We create a function to initailize the local part on each worker. In this case we simulate a heavy workload by adding a `sleep` period. In other words we assume that the individual elements of the array are expensive to evaluate. We obtain the local indices of the `SharedArray` through the function `localindices`. |
| 16 | + |
| 17 | +```julia |
| 18 | +function initialize_localpart(s, sleeptime) |
| 19 | + for ind in localindices(s) |
| 20 | + sleep(sleeptime) |
| 21 | + s[ind] = ind |
| 22 | + end |
| 23 | +end |
| 24 | +``` |
| 25 | + |
| 26 | +We create a function that runs on the root worker on each node and feeds tasks to other workers on that node. We use the function `ParallelUtilities.workers_myhost()` to obtain a list of all workers on the same node. We create the `SharedArray` on these workers so that it is entirely contained on one machine. This is achieved by passing the keyword argument `pids` to the `SharedArray` constructor. We asynchronously spawn tasks to initialize the local parts of the shared array on each worker. |
| 27 | + |
| 28 | +```julia |
| 29 | +function initializenode_sharedarray(sleeptime) |
| 30 | + # obtain the workers on the local machine |
| 31 | + pids = ParallelUtilities.workers_myhost() |
| 32 | + |
| 33 | + # Create a shared array spread across the workers on that node |
| 34 | + s = SharedArray{Int}((2_000,), pids = pids) |
| 35 | + |
| 36 | + # spawn remote tasks to initialize the local part of the shared array |
| 37 | + @sync for p in pids |
| 38 | + @spawnat p initialize_localpart(s, sleeptime) |
| 39 | + end |
| 40 | + return sdata(s) |
| 41 | +end |
| 42 | +``` |
| 43 | + |
| 44 | +We create a main function that runs on the calling process and concatenates the arrays on each node. This is run on a `WorkerPool` consisting of one worker per node which acts as the root process. We may obtain such a pool through the function `ParallelUtilities.workerpool_nodes()`. Finally we call `pmapreduce` with a mapping function that initializes an array on each node, which is followed by a concatenation across the nodes. |
| 45 | + |
| 46 | +```julia |
| 47 | +function main_sharedarray(sleeptime) |
| 48 | + # obtain the workerpool with one process on each node |
| 49 | + pool = ParallelUtilities.workerpool_nodes() |
| 50 | + |
| 51 | + # obtain the number of workers in the pool. |
| 52 | + nw_node = nworkers(pool) |
| 53 | + |
| 54 | + # Evaluate the parallel mapreduce |
| 55 | + pmapreduce(x -> initializenode_sharedarray(sleeptime), hcat, pool, 1:nw_node) |
| 56 | +end |
| 57 | +``` |
| 58 | + |
| 59 | +We compare the results with a serial execution that uses a similar workflow, except we use `Array` instead of `SharedArray` and `mapreduce` instead of `pmapreduce`. |
| 60 | + |
| 61 | +```julia |
| 62 | +function initialize_serial(sleeptime) |
| 63 | + pids = ParallelUtilities.workers_myhost() |
| 64 | + s = Array{Int}(undef, 2_000) |
| 65 | + for ind in eachindex(s) |
| 66 | + sleep(sleeptime) |
| 67 | + s[ind] = ind |
| 68 | + end |
| 69 | + return sdata(s) |
| 70 | +end |
| 71 | + |
| 72 | +function main_serial(sleeptime) |
| 73 | + pool = ParallelUtilities.workerpool_nodes() |
| 74 | + nw_node = nworkers(pool) |
| 75 | + mapreduce(x -> initialize_serial(sleeptime), hcat, 1:nw_node) |
| 76 | +end |
| 77 | +``` |
| 78 | + |
| 79 | +We create a function to compare the performance of the two. We start with a precompilation run with no sleep time, followed by recording the actual timings. |
| 80 | + |
| 81 | +```julia |
| 82 | +function compare_with_serial() |
| 83 | + # precompile |
| 84 | + main_serial(0) |
| 85 | + main_sharedarray(0) |
| 86 | + |
| 87 | + # time |
| 88 | + println("Testing serial") |
| 89 | + A = @time main_serial(5e-3) |
| 90 | + println("Testing sharedarray") |
| 91 | + B = @time main_sharedarray(5e-3) |
| 92 | + |
| 93 | + println("Results match : ", A == B) |
| 94 | +end |
| 95 | +``` |
| 96 | + |
| 97 | +We run this script on a Slurm cluster across 2 nodes with 28 cores on each node. The results are: |
| 98 | + |
| 99 | +```julia |
| 100 | +julia> compare_with_serial() |
| 101 | +Testing serial |
| 102 | + 24.624912 seconds (27.31 k allocations: 1.017 MiB) |
| 103 | +Testing sharedarray |
| 104 | + 1.077752 seconds (4.60 k allocations: 246.281 KiB) |
| 105 | +Results match : true |
| 106 | +``` |
| 107 | + |
| 108 | +The full script may be found [here](sharedarrays.jl). To run this, use |
| 109 | + |
| 110 | +```julia |
| 111 | +julia> @everywhere include("sharedarrays.jl") |
| 112 | + |
| 113 | +julia> SharedArraysTiming.compare_with_serial() |
| 114 | +``` |
0 commit comments