Skip to content
This repository was archived by the owner on Mar 12, 2021. It is now read-only.

Commit 454afc3

Browse files
committed
Reimplement mapreduce.
1 parent 2093291 commit 454afc3

File tree

5 files changed

+193
-155
lines changed

5 files changed

+193
-155
lines changed

Manifest.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ uuid = "c5f51814-7f29-56b8-a69c-e4d8f6be1fde"
3939
version = "6.0.0"
4040

4141
[[CUDAnative]]
42-
deps = ["Adapt", "BinaryProvider", "CEnum", "CUDAapi", "CUDAdrv", "DataStructures", "InteractiveUtils", "LLVM", "Libdl", "Pkg", "Printf", "TimerOutputs"]
43-
git-tree-sha1 = "f5863b95140711306d1d6b31ada234d7bfea2416"
44-
repo-rev = "7c1311b19a0f1eda7e0de724f2fd5f27b70daf1e"
42+
deps = ["Adapt", "BinaryProvider", "CEnum", "CUDAapi", "CUDAdrv", "DataStructures", "InteractiveUtils", "LLVM", "Libdl", "MacroTools", "Pkg", "Printf", "TimerOutputs"]
43+
git-tree-sha1 = "f9e832773afbb4b2e7345153396a153c5f8d5a3d"
44+
repo-rev = "a572eb43a09d55ea23166c0b5a36b70a301293d3"
4545
repo-url = "https://github.com/JuliaGPU/CUDAnative.jl.git"
4646
uuid = "be33ccc6-a3ff-5ff2-a52e-74243cff1e17"
4747
version = "2.10.2"
@@ -62,8 +62,8 @@ uuid = "8ba89e20-285c-5b6f-9357-94700520ee1b"
6262

6363
[[GPUArrays]]
6464
deps = ["AbstractFFTs", "Adapt", "LinearAlgebra", "Printf", "Random", "Serialization"]
65-
git-tree-sha1 = "7eefc8f91796ee7a657641824a8d4eda9d81e93f"
66-
repo-rev = "02b3fb82f06c741c7542e331022463683c01c6f5"
65+
git-tree-sha1 = "6aeda18f125c5b9599c0de40865b67319b69c9b8"
66+
repo-rev = "d6e37deb13c3202947765f8d574c4f9a1b511012"
6767
repo-url = "https://github.com/JuliaGPU/GPUArrays.jl.git"
6868
uuid = "0c68f7d7-f131-5f86-a1c3-88cf8149b2d7"
6969
version = "2.0.1"

src/accumulate.jl

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,5 @@
11
# scan and accumulate
22

3-
# FIXME: certain Base operations, like accumulate, don't allow to pass a neutral element
4-
# since it is not required by the Base implementation (as opposed to eg. reduce).
5-
# to stick to the API, we use global state to provide a callback.
6-
const neutral_elements = Dict{Function,Function}(
7-
Base.:(+) => zero,
8-
Base.add_sum => zero,
9-
Base.:(*) => one,
10-
Base.mul_prod => one,
11-
)
12-
function neutral_element!(op, f)
13-
if haskey(neutral_elements, op)
14-
@warn "Overriding neutral element for $op"
15-
end
16-
neutral_elements[op] = f
17-
end
18-
function neutral_element(op, T)
19-
if !haskey(neutral_elements, op)
20-
error("""CuArrays.jl needs to know the neutral element for your operator $op.
21-
Please register your operator using: `CuArrays.neutral_element!($op, f::Function)`,
22-
providing a function that returns a neutral element for a given element type.""")
23-
end
24-
f = neutral_elements[op]
25-
return f(T)
26-
end
27-
283
## COV_EXCL_START
294

305
# partial scan of individual thread blocks within a grid
@@ -152,14 +127,15 @@ end
152127
## COV_EXCL_STOP
153128

154129
function scan!(f::Function, output::CuArray{T}, input::CuArray;
155-
dims::Integer, init=nothing, neutral=neutral_element(f, T)) where {T}
156-
130+
dims::Integer, init=nothing, neutral=GPUArrays.neutral_element(f, T)) where {T}
157131
dims > 0 || throw(ArgumentError("dims must be a positive integer"))
158132
inds_t = axes(input)
159133
axes(output) == inds_t || throw(DimensionMismatch("shape of B must match A"))
160134
dims > ndims(input) && return copyto!(output, input)
161135
isempty(inds_t[dims]) && return output
162136

137+
f = cufunc(f)
138+
163139
# iteration domain across the main dimension
164140
Rdim = CartesianIndices((size(input, dims),))
165141

src/mapreduce.jl

Lines changed: 184 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,111 +1,205 @@
11
## COV_EXCL_START
22

3-
@inline function reduce_block(arr::CuDeviceArray, op)
4-
sync_threads()
5-
len = blockDim().x
6-
while len != 1
7-
sync_threads()
8-
skip = (len + 1) >> 1
9-
reduce_to = threadIdx().x - skip
10-
if 0 < reduce_to <= (len >> 1)
11-
arr[reduce_to] = op(arr[reduce_to], arr[threadIdx().x])
12-
end
13-
len = skip
3+
# TODO
4+
# - serial version for lower latency
5+
# - block-stride loop to delay need for second kernel launch
6+
7+
# Reduce a value across a warp
8+
@inline function reduce_warp(op, val)
9+
# offset = CUDAnative.warpsize() ÷ 2
10+
# while offset > 0
11+
# val = op(val, shfl_down_sync(0xffffffff, val, offset))
12+
# offset ÷= 2
13+
# end
14+
15+
# Loop unrolling for warpsize = 32
16+
val = op(val, shfl_down_sync(0xffffffff, val, 16, 32))
17+
val = op(val, shfl_down_sync(0xffffffff, val, 8, 32))
18+
val = op(val, shfl_down_sync(0xffffffff, val, 4, 32))
19+
val = op(val, shfl_down_sync(0xffffffff, val, 2, 32))
20+
val = op(val, shfl_down_sync(0xffffffff, val, 1, 32))
21+
22+
return val
23+
end
24+
25+
# Reduce a value across a block, using shared memory for communication
26+
@inline function reduce_block(op, val::T, neutral, shuffle::Val{true}) where T
27+
# shared mem for 32 partial sums
28+
shared = @cuStaticSharedMem(T, 32) # NOTE: this is an upper bound; better detect it
29+
30+
wid, lane = fldmod1(threadIdx().x, CUDAnative.warpsize())
31+
32+
# each warp performs partial reduction
33+
val = reduce_warp(op, val)
34+
35+
# write reduced value to shared memory
36+
if lane == 1
37+
@inbounds shared[wid] = val
1438
end
39+
40+
# wait for all partial reductions
1541
sync_threads()
16-
end
1742

18-
function mapreducedim_kernel_parallel(f, op, R::CuDeviceArray{T}, A::CuDeviceArray{T},
19-
CIS, Rlength, Slength) where {T}
20-
for Ri_base in 0:(gridDim().x * blockDim().y):(Rlength-1)
21-
Ri = Ri_base + (blockIdx().x - 1) * blockDim().y + threadIdx().y
22-
Ri > Rlength && return
23-
RI = Tuple(CartesianIndices(R)[Ri])
24-
S = @cuStaticSharedMem(T, 512)
25-
Si_folded_base = (threadIdx().y - 1) * blockDim().x
26-
Si_folded = Si_folded_base + threadIdx().x
27-
# serial reduction of A into S by Slength ÷ xthreads
28-
for Si_base in 0:blockDim().x:(Slength-1)
29-
Si = Si_base + threadIdx().x
30-
Si > Slength && break
31-
SI = Tuple(CIS[Si])
32-
AI = ifelse.(size(R) .== 1, SI, RI)
33-
if Si_base == 0
34-
S[Si_folded] = f(A[AI...])
35-
else
36-
S[Si_folded] = op(S[Si_folded], f(A[AI...]))
37-
end
38-
end
39-
# block-parallel reduction of S to S[1] by xthreads
40-
reduce_block(view(S, (Si_folded_base + 1):512), op)
41-
# reduce S[1] into R
42-
threadIdx().x == 1 && (R[Ri] = op(R[Ri], S[Si_folded]))
43+
# read from shared memory only if that warp existed
44+
val = if threadIdx().x <= fld1(blockDim().x, CUDAnative.warpsize())
45+
@inbounds shared[lane]
46+
else
47+
neutral
4348
end
44-
return
49+
50+
# final reduce within first warp
51+
if wid == 1
52+
val = reduce_warp(op, val)
53+
end
54+
55+
return val
4556
end
57+
@inline function reduce_block(op, val::T, neutral, shuffle::Val{false}) where T
58+
threads = blockDim().x
59+
thread = threadIdx().x
4660

47-
## COV_EXCL_STOP
61+
# shared mem for a complete reduction
62+
shared = @cuDynamicSharedMem(T, (2*threads,))
63+
@inbounds shared[thread] = val
64+
65+
# perform a reduction
66+
d = threads>>1
67+
while d > 0
68+
sync_threads()
69+
if thread <= d
70+
shared[thread] = op(shared[thread], shared[thread+d])
71+
end
72+
d >>= 1
73+
end
4874

49-
function Base._mapreducedim!(f, op, R::CuArray{T}, A::CuArray{T}) where {T}
50-
# the kernel as generated from `f` and `op` can require lots of registers (eg. #160),
51-
# so we need to be careful about how many threads we launch not to run out of them.
52-
Rlength = length(R)
53-
Ssize = ifelse.(size(R) .== 1, size(A), 1)
54-
Slength = prod(Ssize)
55-
CIS = CartesianIndices(Ssize)
56-
57-
parallel_args = (f, op, R, A, CIS, Rlength, Slength)
58-
GC.@preserve parallel_args begin
59-
parallel_kargs = cudaconvert.(parallel_args)
60-
parallel_tt = Tuple{Core.Typeof.(parallel_kargs)...}
61-
parallel_kernel = cufunction(mapreducedim_kernel_parallel, parallel_tt)
62-
63-
# we are limited in how many threads we can launch...
64-
## by the kernel
65-
kernel_threads = CUDAnative.maxthreads(parallel_kernel)
66-
## by the device
67-
dev = CUDAdrv.device()
68-
block_threads = (x=attribute(dev, CUDAdrv.DEVICE_ATTRIBUTE_MAX_BLOCK_DIM_X),
69-
y=attribute(dev, CUDAdrv.DEVICE_ATTRIBUTE_MAX_BLOCK_DIM_Y),
70-
total=attribute(dev, CUDAdrv.DEVICE_ATTRIBUTE_MAX_THREADS_PER_BLOCK))
71-
72-
# figure out a legal launch configuration
73-
y_thr = min(nextpow(2, Rlength ÷ 512 + 1), 512, block_threads.y, kernel_threads)
74-
x_thr = min(512 ÷ y_thr, Slength, block_threads.x,
75-
ceil(Int, block_threads.total/y_thr),
76-
ceil(Int, kernel_threads/y_thr))
77-
78-
blk, thr = (Rlength - 1) ÷ y_thr + 1, (x_thr, y_thr, 1)
79-
parallel_kernel(parallel_kargs...; threads=thr, blocks=blk)
75+
# load the final value on the first thread
76+
if thread == 1
77+
val = @inbounds shared[thread]
8078
end
8179

82-
return R
80+
return val
8381
end
8482

85-
import Base.minimum, Base.maximum, Base.reduce
83+
# Partially reduce an array across the grid. The reduction is partial, with multiple
84+
# blocks `gridDim_reduce` working on reducing data from `A` and writing it to multiple
85+
# outputs in `R`. All elements to be processed can be addressed by the product of the
86+
# two iterators `Rreduce` and `Rother`, where the latter iterator will have singleton
87+
# entries for the dimensions that should be reduced (and vice versa). The output array
88+
# is expected to have an additional dimension with as size the number of reduced values
89+
# for every reduction (i.e. more than one if there's multiple blocks participating).
90+
function partial_mapreduce_grid(f, op, A, R, neutral, Rreduce, Rother, gridDim_reduce, shuffle)
91+
# decompose the 1D hardware indices into separate ones for reduction (across threads
92+
# and possibly blocks if it doesn't fit) and other elements (remaining blocks)
93+
threadIdx_reduce = threadIdx().x
94+
blockDim_reduce = blockDim().x
95+
blockIdx_other, blockIdx_reduce = fldmod1(blockIdx().x, gridDim_reduce)
96+
97+
# block-based indexing into the values outside of the reduction dimension
98+
# (that means we can safely synchronize threads within this block)
99+
iother = blockIdx_other
100+
@inbounds if iother <= length(Rother)
101+
Iother = Rother[iother]
102+
103+
# load the neutral value
104+
Iout = CartesianIndex(Tuple(Iother)..., blockIdx_reduce)
105+
neutral = if neutral === nothing
106+
R[Iout]
107+
else
108+
neutral
109+
end
86110

87-
_reduced_dims(x::CuArray, ::Colon) = Tuple(Base.ones(Int, ndims(x)))
88-
_reduced_dims(x::CuArray, dims) = Base.reduced_indices(x, dims)
111+
# get a value that should be reduced
112+
ireduce = threadIdx_reduce + (blockIdx_reduce - 1) * blockDim_reduce
113+
val = if ireduce <= length(Rreduce)
114+
Ireduce = Rreduce[ireduce]
115+
J = max(Iother, Ireduce)
116+
f(A[J])
117+
else
118+
neutral
119+
end
120+
val = op(val, neutral)
89121

90-
_initarray(x::CuArray{T}, dims, init) where {T} = fill!(similar(x, T, _reduced_dims(x, dims)), init)
122+
val = reduce_block(op, val, neutral, shuffle)
91123

92-
function _reduce(op, x::CuArray, init, ::Colon)
93-
mx = _initarray(x, :, init)
94-
Base._mapreducedim!(identity, op, mx, x)
95-
collect(mx)[1]
96-
end
124+
# write back to memory
125+
if threadIdx_reduce == 1
126+
R[Iout] = val
127+
end
128+
end
97129

98-
function _reduce(op, x::CuArray, init, dims)
99-
mx = _initarray(x, dims, init)
100-
Base._mapreducedim!(identity, op, mx, x)
130+
return
101131
end
102132

103-
"""
104-
reduce(op, x::CuArray; dims=:, init)
133+
## COV_EXCL_STOP
105134

106-
The initial value `init` is mandatory for `reduce` on `CuArray`'s. It must be a neutral element for `op`.
107-
"""
108-
reduce(op, x::CuArray; dims=:, init) = _reduce(op, x, init, dims)
135+
NVTX.@range function GPUArrays.mapreducedim!(f, op, R::CuArray{T}, A::AbstractArray, init=nothing) where T
136+
Base.check_reducedims(R, A)
137+
isempty(A) && return R
138+
139+
f = cufunc(f)
140+
op = cufunc(op)
141+
142+
# be conservative about using shuffle instructions
143+
shuffle = true
144+
shuffle &= capability(device()) >= v"3.0"
145+
shuffle &= T in (Int32, Int64, Float32, Float64, ComplexF32, ComplexF64)
146+
# TODO: add support for Bool (CUDAnative.jl#420)
147+
148+
# iteration domain, split in two: one part covers the dimensions that should
149+
# be reduced, and the other covers the rest. combining both covers all values.
150+
Rall = CartesianIndices(A)
151+
Rother = CartesianIndices(R)
152+
Rreduce = CartesianIndices(ifelse.(axes(A) .== axes(R), Ref(Base.OneTo(1)), axes(A)))
153+
# NOTE: we hard-code `OneTo` (`first.(axes(A))` would work too) or we get a
154+
# CartesianIndices object with UnitRanges that behave badly on the GPU.
155+
@assert length(Rall) == length(Rother) * length(Rreduce)
156+
157+
# allocate an additional, empty dimension to write the reduced value to.
158+
# this does not affect the actual location in memory of the final values,
159+
# but allows us to write a generalized kernel supporting partial reductions.
160+
R′ = reshape(R, (size(R)..., 1))
161+
162+
# determine how many threads we can launch
163+
args = (f, op, A, R′, init, Rreduce, Rother, 1, Val(shuffle))
164+
kernel_args = cudaconvert.(args)
165+
kernel_tt = Tuple{Core.Typeof.(kernel_args)...}
166+
kernel = cufunction(partial_mapreduce_grid, kernel_tt)
167+
kernel_config =
168+
launch_configuration(kernel.fun; shmem = shuffle ? 0 : threads->2*threads*sizeof(T))
169+
170+
# determine the launch configuration
171+
dev = device()
172+
reduce_threads = shuffle ? nextwarp(dev, length(Rreduce)) : nextpow(2, length(Rreduce))
173+
if reduce_threads > kernel_config.threads
174+
reduce_threads = shuffle ? prevwarp(dev, kernel_config.threads) : prevpow(2, kernel_config.threads)
175+
end
176+
reduce_blocks = cld(length(Rreduce), reduce_threads)
177+
other_blocks = length(Rother)
178+
threads, blocks = reduce_threads, reduce_blocks*other_blocks
179+
shmem = shuffle ? 0 : 2*threads*sizeof(T)
180+
181+
# perform the actual reduction
182+
if reduce_blocks == 1
183+
# we can cover the dimensions to reduce using a single block
184+
@cuda threads=threads blocks=blocks shmem=shmem partial_mapreduce_grid(
185+
f, op, A, R′, init, Rreduce, Rother, 1, Val(shuffle))
186+
else
187+
# we need multiple steps to cover all values to reduce
188+
partial = similar(R, (size(R)..., reduce_blocks))
189+
if init === nothing
190+
# without an explicit initializer we need to copy from the output container
191+
sz = prod(size(R))
192+
for i in 1:reduce_blocks
193+
# TODO: async copies (or async fill!, but then we'd need to load first)
194+
# or maybe just broadcast since that extends singleton dimensions
195+
copyto!(partial, (i-1)*sz+1, R, 1, sz)
196+
end
197+
end
198+
@cuda threads=threads blocks=blocks shmem=shmem partial_mapreduce_grid(
199+
f, op, A, partial, init, Rreduce, Rother, reduce_blocks, Val(shuffle))
109200

110-
minimum(x::CuArray{T}; dims=:) where {T} = _reduce(min, x, typemax(T), dims)
111-
maximum(x::CuArray{T}; dims=:) where {T} = _reduce(max, x, typemin(T), dims)
201+
GPUArrays.mapreducedim!(identity, op, R′, partial, init)
202+
end
203+
204+
return R
205+
end

0 commit comments

Comments
 (0)