Skip to content
This repository was archived by the owner on Mar 12, 2021. It is now read-only.

Commit 2d0c06a

Browse files
committed
Implement two-staged serial+parallel mapreduce
1 parent 1154509 commit 2d0c06a

File tree

1 file changed

+66
-25
lines changed

1 file changed

+66
-25
lines changed

src/mapreduce.jl

Lines changed: 66 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -80,40 +80,49 @@ end
8080
return val
8181
end
8282

83-
# Reduce an array across the grid. All elements to be processed can be addressed by the
84-
# product of the two iterators `Rreduce` and `Rother`, where the latter iterator will have
85-
# singleton entries for the dimensions that should be reduced (and vice versa).
86-
function mapreduce_grid(f, op, A, R, neutral, Rreduce, Rother, shuffle)
83+
# Partially reduce an array across the grid. The reduction is partial, with multiple
84+
# blocks `gridDim_reduce` working on reducing data from `A` and writing it to multiple
85+
# outputs in `R`. All elements to be processed can be addressed by the product of the
86+
# two iterators `Rreduce` and `Rother`, where the latter iterator will have singleton
87+
# entries for the dimensions that should be reduced (and vice versa). The output array
88+
# is expected to have an additional dimension with as size the number of reduced values
89+
# for every reduction (i.e. more than one if there's multiple blocks participating).
90+
function partial_mapreduce_grid(f, op, A, R, neutral, Rreduce, Rother, gridDim_reduce, shuffle)
91+
# decompose the 1D hardware indices into separate ones for reduction (across threads
92+
# and possibly blocks if it doesn't fit) and other elements (remaining blocks)
93+
threadIdx_reduce = threadIdx().x
94+
blockDim_reduce = blockDim().x
95+
blockIdx_other, blockIdx_reduce = fldmod1(blockIdx().x, gridDim_reduce)
96+
8797
# block-based indexing into the values outside of the reduction dimension
8898
# (that means we can safely synchronize threads within this block)
89-
iother = blockIdx().x
99+
iother = blockIdx_other
90100
@inbounds if iother <= length(Rother)
91101
Iother = Rother[iother]
92102

93103
# load the neutral value
94-
Iout = Iother
104+
Iout = CartesianIndex(Tuple(Iother)..., blockIdx_reduce)
95105
neutral = if neutral === nothing
96106
R[Iout]
97107
else
98108
neutral
99109
end
100-
110+
101111
val = op(neutral, neutral)
102112

103-
# reduce serially across chunks of input vector that don't fit in a block
104-
ireduce = threadIdx().x
113+
# get a value that should be reduced
114+
ireduce = threadIdx_reduce + (blockIdx_reduce - 1) * blockDim_reduce
105115
while ireduce <= length(Rreduce)
106116
Ireduce = Rreduce[ireduce]
107117
J = max(Iother, Ireduce)
108118
val = op(val, f(A[J]))
109-
ireduce += blockDim().x
119+
ireduce += blockDim_reduce * gridDim_reduce
110120
end
111121

112-
# reduce in parallel within the current block
113122
val = reduce_block(op, val, neutral, shuffle)
114123

115124
# write back to memory
116-
if threadIdx().x == 1
125+
if threadIdx_reduce == 1
117126
R[Iout] = val
118127
end
119128
end
@@ -133,7 +142,8 @@ NVTX.@range function GPUArrays.mapreducedim!(f, op, R::CuArray{T}, A::AbstractAr
133142
# be conservative about using shuffle instructions
134143
shuffle = true
135144
shuffle &= capability(device()) >= v"3.0"
136-
shuffle &= T in (Bool, Int32, Int64, Float32, Float64, ComplexF32, ComplexF64)
145+
shuffle &= T in (Int32, Int64, Float32, Float64, ComplexF32, ComplexF64)
146+
# TODO: add support for Bool (CUDAnative.jl#420)
137147

138148
# iteration domain, split in two: one part covers the dimensions that should
139149
# be reduced, and the other covers the rest. combining both covers all values.
@@ -144,21 +154,52 @@ NVTX.@range function GPUArrays.mapreducedim!(f, op, R::CuArray{T}, A::AbstractAr
144154
# CartesianIndices object with UnitRanges that behave badly on the GPU.
145155
@assert length(Rall) == length(Rother) * length(Rreduce)
146156

147-
function configurator(kernel)
148-
config = launch_configuration(kernel.fun)
149-
dev = device()
150-
151-
threads = shuffle ? nextwarp(dev, length(Rreduce)) : nextpow(2, length(Rreduce))
152-
if threads > config.threads
153-
threads = shuffle ? prevwarp(dev, config.threads) : prevpow(2, config.threads)
157+
# allocate an additional, empty dimension to write the reduced value to.
158+
# this does not affect the actual location in memory of the final values,
159+
# but allows us to write a generalized kernel supporting partial reductions.
160+
R′ = reshape(R, (size(R)..., 1))
161+
162+
# determine how many threads we can launch
163+
args = (f, op, A, R′, init, Rreduce, Rother, 1, Val(shuffle))
164+
kernel_args = cudaconvert.(args)
165+
kernel_tt = Tuple{Core.Typeof.(kernel_args)...}
166+
kernel = cufunction(partial_mapreduce_grid, kernel_tt)
167+
kernel_config =
168+
launch_configuration(kernel.fun; shmem = shuffle ? 0 : threads->2*threads*sizeof(T))
169+
170+
# determine the launch configuration
171+
dev = device()
172+
reduce_threads = shuffle ? nextwarp(dev, length(Rreduce)) : nextpow(2, length(Rreduce))
173+
if reduce_threads > kernel_config.threads
174+
reduce_threads = shuffle ? prevwarp(dev, kernel_config.threads) : prevpow(2, kernel_config.threads)
175+
end
176+
reduce_blocks = min(reduce_threads, cld(length(Rreduce), reduce_threads))
177+
other_blocks = length(Rother)
178+
threads, blocks = reduce_threads, reduce_blocks*other_blocks
179+
shmem = shuffle ? 0 : 2*threads*sizeof(T)
180+
181+
# perform the actual reduction
182+
if reduce_blocks == 1
183+
# we can cover the dimensions to reduce using a single block
184+
@cuda threads=threads blocks=blocks shmem=shmem partial_mapreduce_grid(
185+
f, op, A, R′, init, Rreduce, Rother, 1, Val(shuffle))
186+
else
187+
# we need multiple steps to cover all values to reduce
188+
partial = similar(R, (size(R)..., reduce_blocks))
189+
if init === nothing
190+
# without an explicit initializer we need to copy from the output container
191+
sz = prod(size(R))
192+
for i in 1:reduce_blocks
193+
# TODO: async copies (or async fill!, but then we'd need to load first)
194+
# or maybe just broadcast since that extends singleton dimensions
195+
copyto!(partial, (i-1)*sz+1, R, 1, sz)
196+
end
154197
end
155-
blocks = length(Rother)
156-
shmem = shuffle ? 0 : 2*threads*sizeof(T)
198+
@cuda threads=threads blocks=blocks shmem=shmem partial_mapreduce_grid(
199+
f, op, A, partial, init, Rreduce, Rother, reduce_blocks, Val(shuffle))
157200

158-
return (threads=threads, blocks=blocks, shmem=shmem)
201+
GPUArrays.mapreducedim!(identity, op, R′, partial, init)
159202
end
160203

161-
@cuda config=configurator mapreduce_grid(f, op, A, R, init, Rreduce, Rother, Val(shuffle))
162-
163204
return R
164205
end

0 commit comments

Comments
 (0)