Skip to content
This repository was archived by the owner on Mar 12, 2021. It is now read-only.

Reimplement mapreduce. #602

Merged
merged 1 commit into from
Feb 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions Manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ uuid = "c5f51814-7f29-56b8-a69c-e4d8f6be1fde"
version = "6.0.0"

[[CUDAnative]]
deps = ["Adapt", "BinaryProvider", "CEnum", "CUDAapi", "CUDAdrv", "DataStructures", "InteractiveUtils", "LLVM", "Libdl", "Pkg", "Printf", "TimerOutputs"]
git-tree-sha1 = "f5863b95140711306d1d6b31ada234d7bfea2416"
repo-rev = "7c1311b19a0f1eda7e0de724f2fd5f27b70daf1e"
deps = ["Adapt", "BinaryProvider", "CEnum", "CUDAapi", "CUDAdrv", "DataStructures", "InteractiveUtils", "LLVM", "Libdl", "MacroTools", "Pkg", "Printf", "TimerOutputs"]
git-tree-sha1 = "5beaa8e6a8fb65003b7b2555a73e7666468b22f6"
repo-rev = "46d5331"
repo-url = "https://github.com/JuliaGPU/CUDAnative.jl.git"
uuid = "be33ccc6-a3ff-5ff2-a52e-74243cff1e17"
version = "2.10.2"
Expand All @@ -62,8 +62,8 @@ uuid = "8ba89e20-285c-5b6f-9357-94700520ee1b"

[[GPUArrays]]
deps = ["AbstractFFTs", "Adapt", "LinearAlgebra", "Printf", "Random", "Serialization"]
git-tree-sha1 = "7eefc8f91796ee7a657641824a8d4eda9d81e93f"
repo-rev = "02b3fb82f06c741c7542e331022463683c01c6f5"
git-tree-sha1 = "6aeda18f125c5b9599c0de40865b67319b69c9b8"
repo-rev = "d6e37deb13c3202947765f8d574c4f9a1b511012"
repo-url = "https://github.com/JuliaGPU/GPUArrays.jl.git"
uuid = "0c68f7d7-f131-5f86-a1c3-88cf8149b2d7"
version = "2.0.1"
Expand Down
30 changes: 3 additions & 27 deletions src/accumulate.jl
Original file line number Diff line number Diff line change
@@ -1,30 +1,5 @@
# scan and accumulate

# FIXME: certain Base operations, like accumulate, don't allow to pass a neutral element
# since it is not required by the Base implementation (as opposed to eg. reduce).
# to stick to the API, we use global state to provide a callback.
const neutral_elements = Dict{Function,Function}(
Base.:(+) => zero,
Base.add_sum => zero,
Base.:(*) => one,
Base.mul_prod => one,
)
function neutral_element!(op, f)
if haskey(neutral_elements, op)
@warn "Overriding neutral element for $op"
end
neutral_elements[op] = f
end
function neutral_element(op, T)
if !haskey(neutral_elements, op)
error("""CuArrays.jl needs to know the neutral element for your operator $op.
Please register your operator using: `CuArrays.neutral_element!($op, f::Function)`,
providing a function that returns a neutral element for a given element type.""")
end
f = neutral_elements[op]
return f(T)
end

## COV_EXCL_START

# partial scan of individual thread blocks within a grid
Expand Down Expand Up @@ -152,14 +127,15 @@ end
## COV_EXCL_STOP

function scan!(f::Function, output::CuArray{T}, input::CuArray;
dims::Integer, init=nothing, neutral=neutral_element(f, T)) where {T}

dims::Integer, init=nothing, neutral=GPUArrays.neutral_element(f, T)) where {T}
dims > 0 || throw(ArgumentError("dims must be a positive integer"))
inds_t = axes(input)
axes(output) == inds_t || throw(DimensionMismatch("shape of B must match A"))
dims > ndims(input) && return copyto!(output, input)
isempty(inds_t[dims]) && return output

f = cufunc(f)

# iteration domain across the main dimension
Rdim = CartesianIndices((size(input, dims),))

Expand Down
274 changes: 184 additions & 90 deletions src/mapreduce.jl
Original file line number Diff line number Diff line change
@@ -1,111 +1,205 @@
## COV_EXCL_START

@inline function reduce_block(arr::CuDeviceArray, op)
sync_threads()
len = blockDim().x
while len != 1
sync_threads()
skip = (len + 1) >> 1
reduce_to = threadIdx().x - skip
if 0 < reduce_to <= (len >> 1)
arr[reduce_to] = op(arr[reduce_to], arr[threadIdx().x])
end
len = skip
# TODO
# - serial version for lower latency
# - block-stride loop to delay need for second kernel launch

# Reduce a value across a warp
@inline function reduce_warp(op, val)
# offset = CUDAnative.warpsize() ÷ 2
# while offset > 0
# val = op(val, shfl_down_sync(0xffffffff, val, offset))
# offset ÷= 2
# end

# Loop unrolling for warpsize = 32
val = op(val, shfl_down_sync(0xffffffff, val, 16, 32))
val = op(val, shfl_down_sync(0xffffffff, val, 8, 32))
val = op(val, shfl_down_sync(0xffffffff, val, 4, 32))
val = op(val, shfl_down_sync(0xffffffff, val, 2, 32))
val = op(val, shfl_down_sync(0xffffffff, val, 1, 32))

return val
end

# Reduce a value across a block, using shared memory for communication
@inline function reduce_block(op, val::T, neutral, shuffle::Val{true}) where T
# shared mem for 32 partial sums
shared = @cuStaticSharedMem(T, 32) # NOTE: this is an upper bound; better detect it

wid, lane = fldmod1(threadIdx().x, CUDAnative.warpsize())

# each warp performs partial reduction
val = reduce_warp(op, val)

# write reduced value to shared memory
if lane == 1
@inbounds shared[wid] = val
end

# wait for all partial reductions
sync_threads()
end

function mapreducedim_kernel_parallel(f, op, R::CuDeviceArray{T}, A::CuDeviceArray{T},
CIS, Rlength, Slength) where {T}
for Ri_base in 0:(gridDim().x * blockDim().y):(Rlength-1)
Ri = Ri_base + (blockIdx().x - 1) * blockDim().y + threadIdx().y
Ri > Rlength && return
RI = Tuple(CartesianIndices(R)[Ri])
S = @cuStaticSharedMem(T, 512)
Si_folded_base = (threadIdx().y - 1) * blockDim().x
Si_folded = Si_folded_base + threadIdx().x
# serial reduction of A into S by Slength ÷ xthreads
for Si_base in 0:blockDim().x:(Slength-1)
Si = Si_base + threadIdx().x
Si > Slength && break
SI = Tuple(CIS[Si])
AI = ifelse.(size(R) .== 1, SI, RI)
if Si_base == 0
S[Si_folded] = f(A[AI...])
else
S[Si_folded] = op(S[Si_folded], f(A[AI...]))
end
end
# block-parallel reduction of S to S[1] by xthreads
reduce_block(view(S, (Si_folded_base + 1):512), op)
# reduce S[1] into R
threadIdx().x == 1 && (R[Ri] = op(R[Ri], S[Si_folded]))
# read from shared memory only if that warp existed
val = if threadIdx().x <= fld1(blockDim().x, CUDAnative.warpsize())
@inbounds shared[lane]
else
neutral
end
return

# final reduce within first warp
if wid == 1
val = reduce_warp(op, val)
end

return val
end
@inline function reduce_block(op, val::T, neutral, shuffle::Val{false}) where T
threads = blockDim().x
thread = threadIdx().x

## COV_EXCL_STOP
# shared mem for a complete reduction
shared = @cuDynamicSharedMem(T, (2*threads,))
@inbounds shared[thread] = val

# perform a reduction
d = threads>>1
while d > 0
sync_threads()
if thread <= d
shared[thread] = op(shared[thread], shared[thread+d])
end
d >>= 1
end

function Base._mapreducedim!(f, op, R::CuArray{T}, A::CuArray{T}) where {T}
# the kernel as generated from `f` and `op` can require lots of registers (eg. #160),
# so we need to be careful about how many threads we launch not to run out of them.
Rlength = length(R)
Ssize = ifelse.(size(R) .== 1, size(A), 1)
Slength = prod(Ssize)
CIS = CartesianIndices(Ssize)

parallel_args = (f, op, R, A, CIS, Rlength, Slength)
GC.@preserve parallel_args begin
parallel_kargs = cudaconvert.(parallel_args)
parallel_tt = Tuple{Core.Typeof.(parallel_kargs)...}
parallel_kernel = cufunction(mapreducedim_kernel_parallel, parallel_tt)

# we are limited in how many threads we can launch...
## by the kernel
kernel_threads = CUDAnative.maxthreads(parallel_kernel)
## by the device
dev = CUDAdrv.device()
block_threads = (x=attribute(dev, CUDAdrv.DEVICE_ATTRIBUTE_MAX_BLOCK_DIM_X),
y=attribute(dev, CUDAdrv.DEVICE_ATTRIBUTE_MAX_BLOCK_DIM_Y),
total=attribute(dev, CUDAdrv.DEVICE_ATTRIBUTE_MAX_THREADS_PER_BLOCK))

# figure out a legal launch configuration
y_thr = min(nextpow(2, Rlength ÷ 512 + 1), 512, block_threads.y, kernel_threads)
x_thr = min(512 ÷ y_thr, Slength, block_threads.x,
ceil(Int, block_threads.total/y_thr),
ceil(Int, kernel_threads/y_thr))

blk, thr = (Rlength - 1) ÷ y_thr + 1, (x_thr, y_thr, 1)
parallel_kernel(parallel_kargs...; threads=thr, blocks=blk)
# load the final value on the first thread
if thread == 1
val = @inbounds shared[thread]
end

return R
return val
end

import Base.minimum, Base.maximum, Base.reduce
# Partially reduce an array across the grid. The reduction is partial, with multiple
# blocks `gridDim_reduce` working on reducing data from `A` and writing it to multiple
# outputs in `R`. All elements to be processed can be addressed by the product of the
# two iterators `Rreduce` and `Rother`, where the latter iterator will have singleton
# entries for the dimensions that should be reduced (and vice versa). The output array
# is expected to have an additional dimension with as size the number of reduced values
# for every reduction (i.e. more than one if there's multiple blocks participating).
function partial_mapreduce_grid(f, op, A, R, neutral, Rreduce, Rother, gridDim_reduce, shuffle)
# decompose the 1D hardware indices into separate ones for reduction (across threads
# and possibly blocks if it doesn't fit) and other elements (remaining blocks)
threadIdx_reduce = threadIdx().x
blockDim_reduce = blockDim().x
blockIdx_other, blockIdx_reduce = fldmod1(blockIdx().x, gridDim_reduce)

# block-based indexing into the values outside of the reduction dimension
# (that means we can safely synchronize threads within this block)
iother = blockIdx_other
@inbounds if iother <= length(Rother)
Iother = Rother[iother]

# load the neutral value
Iout = CartesianIndex(Tuple(Iother)..., blockIdx_reduce)
neutral = if neutral === nothing
R[Iout]
else
neutral
end

_reduced_dims(x::CuArray, ::Colon) = Tuple(Base.ones(Int, ndims(x)))
_reduced_dims(x::CuArray, dims) = Base.reduced_indices(x, dims)
# get a value that should be reduced
ireduce = threadIdx_reduce + (blockIdx_reduce - 1) * blockDim_reduce
val = if ireduce <= length(Rreduce)
Ireduce = Rreduce[ireduce]
J = max(Iother, Ireduce)
f(A[J])
else
neutral
end
val = op(val, neutral)

_initarray(x::CuArray{T}, dims, init) where {T} = fill!(similar(x, T, _reduced_dims(x, dims)), init)
val = reduce_block(op, val, neutral, shuffle)

function _reduce(op, x::CuArray, init, ::Colon)
mx = _initarray(x, :, init)
Base._mapreducedim!(identity, op, mx, x)
collect(mx)[1]
end
# write back to memory
if threadIdx_reduce == 1
R[Iout] = val
end
end

function _reduce(op, x::CuArray, init, dims)
mx = _initarray(x, dims, init)
Base._mapreducedim!(identity, op, mx, x)
return
end

"""
reduce(op, x::CuArray; dims=:, init)
## COV_EXCL_STOP

The initial value `init` is mandatory for `reduce` on `CuArray`'s. It must be a neutral element for `op`.
"""
reduce(op, x::CuArray; dims=:, init) = _reduce(op, x, init, dims)
NVTX.@range function GPUArrays.mapreducedim!(f, op, R::CuArray{T}, A::AbstractArray, init=nothing) where T
Base.check_reducedims(R, A)
isempty(A) && return R

f = cufunc(f)
op = cufunc(op)

# be conservative about using shuffle instructions
shuffle = true
shuffle &= capability(device()) >= v"3.0"
shuffle &= T in (Int32, Int64, Float32, Float64, ComplexF32, ComplexF64)
# TODO: add support for Bool (CUDAnative.jl#420)

# iteration domain, split in two: one part covers the dimensions that should
# be reduced, and the other covers the rest. combining both covers all values.
Rall = CartesianIndices(A)
Rother = CartesianIndices(R)
Rreduce = CartesianIndices(ifelse.(axes(A) .== axes(R), Ref(Base.OneTo(1)), axes(A)))
# NOTE: we hard-code `OneTo` (`first.(axes(A))` would work too) or we get a
# CartesianIndices object with UnitRanges that behave badly on the GPU.
@assert length(Rall) == length(Rother) * length(Rreduce)

# allocate an additional, empty dimension to write the reduced value to.
# this does not affect the actual location in memory of the final values,
# but allows us to write a generalized kernel supporting partial reductions.
R′ = reshape(R, (size(R)..., 1))

# determine how many threads we can launch
args = (f, op, A, R′, init, Rreduce, Rother, 1, Val(shuffle))
kernel_args = cudaconvert.(args)
kernel_tt = Tuple{Core.Typeof.(kernel_args)...}
kernel = cufunction(partial_mapreduce_grid, kernel_tt)
kernel_config =
launch_configuration(kernel.fun; shmem = shuffle ? 0 : threads->2*threads*sizeof(T))

# determine the launch configuration
dev = device()
reduce_threads = shuffle ? nextwarp(dev, length(Rreduce)) : nextpow(2, length(Rreduce))
if reduce_threads > kernel_config.threads
reduce_threads = shuffle ? prevwarp(dev, kernel_config.threads) : prevpow(2, kernel_config.threads)
end
reduce_blocks = cld(length(Rreduce), reduce_threads)
other_blocks = length(Rother)
threads, blocks = reduce_threads, reduce_blocks*other_blocks
shmem = shuffle ? 0 : 2*threads*sizeof(T)

# perform the actual reduction
if reduce_blocks == 1
# we can cover the dimensions to reduce using a single block
@cuda threads=threads blocks=blocks shmem=shmem partial_mapreduce_grid(
f, op, A, R′, init, Rreduce, Rother, 1, Val(shuffle))
else
# we need multiple steps to cover all values to reduce
partial = similar(R, (size(R)..., reduce_blocks))
if init === nothing
# without an explicit initializer we need to copy from the output container
sz = prod(size(R))
for i in 1:reduce_blocks
# TODO: async copies (or async fill!, but then we'd need to load first)
# or maybe just broadcast since that extends singleton dimensions
copyto!(partial, (i-1)*sz+1, R, 1, sz)
end
end
@cuda threads=threads blocks=blocks shmem=shmem partial_mapreduce_grid(
f, op, A, partial, init, Rreduce, Rother, reduce_blocks, Val(shuffle))

minimum(x::CuArray{T}; dims=:) where {T} = _reduce(min, x, typemax(T), dims)
maximum(x::CuArray{T}; dims=:) where {T} = _reduce(max, x, typemin(T), dims)
GPUArrays.mapreducedim!(identity, op, R′, partial, init)
end

return R
end
Loading