Skip to content

Commit c109686

Browse files
committed
Add missing parallel=:threads implementations
These implementations are extremely basic, but they try to follow the patterns in the other parts of the Parallel module. My real motivation is to be able to move the Distributed implementations into an extension, so that Graphs.jl does not depend on Distributed.
1 parent 6130332 commit c109686

File tree

4 files changed

+249
-2
lines changed

4 files changed

+249
-2
lines changed

ext/GraphsDistributedExt.jl

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
module GraphsSharedArraysExt
2+
3+
using Graphs
4+
using SharedArrays: SharedArrays, SharedMatrix, SharedVector, sdata
5+
using SharedArrays.Distributed: @distributed
6+
7+
# betweenness
8+
function Graphs.distr_betweenness_centrality(
9+
g::AbstractGraph,
10+
vs=vertices(g),
11+
distmx::AbstractMatrix=weights(g);
12+
normalize=true,
13+
endpoints=false,
14+
)::Vector{Float64}
15+
n_v = nv(g)
16+
k = length(vs)
17+
isdir = is_directed(g)
18+
19+
# Parallel reduction
20+
21+
betweenness = @distributed (+) for s in vs
22+
temp_betweenness = zeros(n_v)
23+
if degree(g, s) > 0 # this might be 1?
24+
state = Graphs.dijkstra_shortest_paths(
25+
g, s, distmx; allpaths=true, trackvertices=true
26+
)
27+
if endpoints
28+
Graphs._accumulate_endpoints!(temp_betweenness, state, g, s)
29+
else
30+
Graphs._accumulate_basic!(temp_betweenness, state, g, s)
31+
end
32+
end
33+
temp_betweenness
34+
end
35+
36+
Graphs._rescale!(betweenness, n_v, normalize, isdir, k)
37+
38+
return betweenness
39+
end
40+
41+
# closeness
42+
function Graphs.distr_closeness_centrality(
43+
g::AbstractGraph, distmx::AbstractMatrix=weights(g); normalize=true
44+
)::Vector{Float64}
45+
n_v = Int(nv(g))
46+
closeness = SharedVector{Float64}(n_v)
47+
fill!(closeness, 0.0)
48+
49+
@sync @distributed for u in vertices(g)
50+
if degree(g, u) == 0 # no need to do Dijkstra here
51+
closeness[u] = 0.0
52+
else
53+
d = Graphs.dijkstra_shortest_paths(g, u, distmx).dists
54+
δ = filter(x -> x != typemax(x), d)
55+
σ = sum(δ)
56+
l = length(δ) - 1
57+
if σ > 0
58+
closeness[u] = l / σ
59+
if normalize
60+
n = l * 1.0 / (n_v - 1)
61+
closeness[u] *= n
62+
end
63+
else
64+
closeness[u] = 0.0
65+
end
66+
end
67+
end
68+
return sdata(closeness)
69+
end
70+
71+
# radiality
72+
function Graphs.distr_radiality_centrality(g::AbstractGraph)::Vector{Float64}
73+
n_v = nv(g)
74+
vs = vertices(g)
75+
n = ne(g)
76+
meandists = SharedVector{Float64}(Int(n_v))
77+
maxdists = SharedVector{Float64}(Int(n_v))
78+
79+
@sync @distributed for i in 1:n_v
80+
d = Graphs.dijkstra_shortest_paths(g, vs[i])
81+
maxdists[i] = maximum(d.dists)
82+
meandists[i] = sum(d.dists) / (n_v - 1)
83+
nothing
84+
end
85+
dmtr = maximum(maxdists)
86+
radialities = collect(meandists)
87+
return ((dmtr + 1) .- radialities) ./ dmtr
88+
end
89+
90+
# stress
91+
function Graphs.distr_stress_centrality(g::AbstractGraph, vs=vertices(g))::Vector{Int64}
92+
n_v = nv(g)
93+
k = length(vs)
94+
isdir = is_directed(g)
95+
96+
# Parallel reduction
97+
stress = @distributed (+) for s in vs
98+
temp_stress = zeros(Int64, n_v)
99+
if degree(g, s) > 0 # this might be 1?
100+
state = Graphs.dijkstra_shortest_paths(g, s; allpaths=true, trackvertices=true)
101+
Graphs._stress_accumulate_basic!(temp_stress, state, g, s)
102+
end
103+
temp_stress
104+
end
105+
return stress
106+
end
107+
108+
# generate_reduce
109+
function Graphs.distr_generate_reduce(
110+
g::AbstractGraph{T}, gen_func::Function, comp::Comp, reps::Integer
111+
) where {T<:Integer,Comp}
112+
# Type assert required for type stability
113+
min_set::Vector{T} = @distributed ((x, y) -> comp(x, y) ? x : y) for _ in 1:reps
114+
gen_func(g)
115+
end
116+
return min_set
117+
end
118+
119+
# eccentricity
120+
function Graphs.distr_eccentricity(
121+
g::AbstractGraph, vs=vertices(g), distmx::AbstractMatrix{T}=weights(g)
122+
) where {T<:Number}
123+
vlen = length(vs)
124+
eccs = SharedVector{T}(vlen)
125+
@sync @distributed for i in 1:vlen
126+
d = Graphs.dijkstra_shortest_paths(g, vs[i], distmx)
127+
eccs[i] = maximum(d.dists)
128+
end
129+
d = sdata(eccs)
130+
maximum(d) == typemax(T) && @warn("Infinite path length detected")
131+
return d
132+
end
133+
134+
# dijkstra shortest paths
135+
function Graphs.distr_dijkstra_shortest_paths(
136+
g::AbstractGraph{U}, sources=vertices(g), distmx::AbstractMatrix{T}=weights(g)
137+
) where {T<:Number} where {U}
138+
n_v = nv(g)
139+
r_v = length(sources)
140+
141+
# TODO: remove `Int` once julialang/#23029 / #23032 are resolved
142+
dists = SharedMatrix{T}(Int(r_v), Int(n_v))
143+
parents = SharedMatrix{U}(Int(r_v), Int(n_v))
144+
145+
@sync @distributed for i in 1:r_v
146+
state = Graphs.dijkstra_shortest_paths(g, sources[i], distmx)
147+
dists[i, :] = state.dists
148+
parents[i, :] = state.parents
149+
end
150+
151+
result = MultipleDijkstraState(sdata(dists), sdata(parents))
152+
return result
153+
end
154+
155+
# random greedy color
156+
function Graphs.distr_random_greedy_color(g::AbstractGraph{T}, reps::Integer) where {T<:Integer}
157+
best = @distributed (Graphs.best_color) for i in 1:reps
158+
seq = shuffle(vertices(g))
159+
Graphs.perm_greedy_color(g, seq)
160+
end
161+
162+
return convert(Graphs.Coloring{T}, best)
163+
end
164+
165+
end

src/Parallel/distance.jl

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,44 @@
22

33
function eccentricity(
44
g::AbstractGraph, vs=vertices(g), distmx::AbstractMatrix{T}=weights(g)
5+
; parallel=:distributed,
6+
) where {T<:Number}
7+
return if parallel === :threads
8+
threaded_eccentricity(g, vs, distmx)
9+
elseif parallel === :distributed
10+
distr_eccentricity(g, vs, distmx)
11+
else
12+
error("""Unsupported parallel argument '$(repr(parallel))' (supported: ':threads' or ':distributed')""")
13+
end
14+
end
15+
16+
function distr_eccentricity(
17+
g::AbstractGraph, vs=vertices(g), distmx::AbstractMatrix{T}=weights(g)
518
) where {T<:Number}
619
vlen = length(vs)
720
eccs = SharedVector{T}(vlen)
821
@sync @distributed for i in 1:vlen
9-
eccs[i] = maximum(Graphs.dijkstra_shortest_paths(g, vs[i], distmx).dists)
22+
d = Graphs.dijkstra_shortest_paths(g, vs[i], distmx)
23+
eccs[i] = maximum(d.dists)
1024
end
1125
d = sdata(eccs)
1226
maximum(d) == typemax(T) && @warn("Infinite path length detected")
1327
return d
1428
end
1529

30+
function threaded_eccentricity(
31+
g::AbstractGraph, vs=vertices(g), distmx::AbstractMatrix{T}=weights(g)
32+
) where {T<:Number}
33+
vlen = length(vs)
34+
eccs = Vector{T}(vlen)
35+
Base.Threads.@threads for i in 1:vlen
36+
d = Graphs.dijkstra_shortest_paths(g, vs[i], distmx)
37+
eccs[i] = maximum(d.dists)
38+
end
39+
maximum(eccs) == typemax(T) && @warn("Infinite path length detected")
40+
return eccs
41+
end
42+
1643
function eccentricity(g::AbstractGraph, distmx::AbstractMatrix)
1744
return eccentricity(g, vertices(g), distmx)
1845
end

src/Parallel/shortestpaths/dijkstra.jl

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,39 @@ traversal information.
1818
"""
1919
function dijkstra_shortest_paths(
2020
g::AbstractGraph{U}, sources=vertices(g), distmx::AbstractMatrix{T}=weights(g)
21+
; parallel=:distributed,
22+
) where {T<:Number} where {U}
23+
return if parallel === :threads
24+
threaded_dijkstra_shortest_paths(g, sources, distmx)
25+
elseif parallel === :distributed
26+
distr_dijkstra_shortest_paths(g, sources, distmx)
27+
else
28+
error("""Unsupported parallel argument '$(repr(parallel))' (supported: ':threads' or ':distributed')""")
29+
end
30+
end
31+
32+
function threaded_dijkstra_shortest_paths(
33+
g::AbstractGraph{U}, sources=vertices(g), distmx::AbstractMatrix{T}=weights(g)
34+
) where {T<:Number} where {U}
35+
n_v = nv(g)
36+
r_v = length(sources)
37+
38+
# TODO: remove `Int` once julialang/#23029 / #23032 are resolved
39+
dists = Matrix{T}(Int(r_v), Int(n_v))
40+
parents = Matrix{U}(Int(r_v), Int(n_v))
41+
42+
Base.Threads.@threads for i in 1:r_v
43+
state = Graphs.dijkstra_shortest_paths(g, sources[i], distmx)
44+
dists[i, :] = state.dists
45+
parents[i, :] = state.parents
46+
end
47+
48+
result = MultipleDijkstraState(dists, parents)
49+
return result
50+
end
51+
52+
function distr_dijkstra_shortest_paths(
53+
g::AbstractGraph{U}, sources=vertices(g), distmx::AbstractMatrix{T}=weights(g)
2154
) where {T<:Number} where {U}
2255
n_v = nv(g)
2356
r_v = length(sources)

src/Parallel/traversals/greedy_color.jl

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,26 @@
1-
function random_greedy_color(g::AbstractGraph{T}, reps::Integer) where {T<:Integer}
1+
function random_greedy_color(g::AbstractGraph{T}, reps::Integer;
2+
parallel=:distributed) where {T<:Integer}
3+
return if parallel === :threads
4+
threaded_random_greedy_color(g, reps)
5+
elseif parallel === :distributed
6+
distr_random_greedy_color(g, reps)
7+
else
8+
error("""Unsupported parallel argument '$(repr(parallel))' (supported: ':threads' or ':distributed')""")
9+
end
10+
end
11+
12+
function threaded_random_greedy_color(g::AbstractGraph{T}, reps::Integer) where {T<:Integer}
13+
local_best = Any[nothing for _ in 1:reps]
14+
Base.Threads.@threads for i in 1:reps
15+
seq = shuffle(vertices(g))
16+
local_best[t] = Graphs.perm_greedy_color(g, seq)
17+
end
18+
best = reduce(Graphs.best_color, local_best)
19+
20+
return convert(Graphs.Coloring{T}, best)
21+
end
22+
23+
function distr_random_greedy_color(g::AbstractGraph{T}, reps::Integer) where {T<:Integer}
224
best = @distributed (Graphs.best_color) for i in 1:reps
325
seq = shuffle(vertices(g))
426
Graphs.perm_greedy_color(g, seq)

0 commit comments

Comments
 (0)